diff --git a/Cargo.lock b/Cargo.lock index 6e97261020..5f09a11d01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2931,6 +2931,7 @@ dependencies = [ "datafusion_util", "executor", "futures", + "generated_types", "hashbrown 0.13.2", "influxdb_influxql_parser", "insta", @@ -2945,6 +2946,7 @@ dependencies = [ "regex", "schema", "serde", + "serde_json", "snafu", "test_helpers", "tokio", diff --git a/generated_types/protos/influxdata/iox/querier/v1/flight.proto b/generated_types/protos/influxdata/iox/querier/v1/flight.proto index 7b183e4682..b2b537cf5d 100644 --- a/generated_types/protos/influxdata/iox/querier/v1/flight.proto +++ b/generated_types/protos/influxdata/iox/querier/v1/flight.proto @@ -54,3 +54,30 @@ message ReadInfo { // provide data lineage information, statistics, watermarks or other // information in the future. message AppMetadata {} + +// A structure which describes the layout of the group key in a `RecordBatch`. +// This information is used to map the data in a `RecordBatch` to the InfluxDB data model +// where in addition to a data type, each columns is either a `tag`, `field` or `timestamp` +// +// Typically, this structure is encoded in the schema of a `RecordBatch`. +message InfluxQlMetadata { + uint32 measurement_column_index = 1; + + // Provides additional metadata about a column that is used + // to form part of the group key. + message TagKeyColumn { + // The tag key name. + string tag_key = 1; + // The column index of the tag values. + uint32 column_index = 2; + // `true` if the tag key column is also a projected column in original query. + bool is_projected = 3; + } + + // A list of tag key names and and associated metadata. + // + // **Note**: + // The vector is sorted by the `tag_key` field in lexicographically + // ascending order. + repeated TagKeyColumn tag_key_columns = 2; +} \ No newline at end of file diff --git a/influxdb_influxql_parser/src/expression/arithmetic.rs b/influxdb_influxql_parser/src/expression/arithmetic.rs index 50fdad5b5a..37b73f0b7c 100644 --- a/influxdb_influxql_parser/src/expression/arithmetic.rs +++ b/influxdb_influxql_parser/src/expression/arithmetic.rs @@ -418,7 +418,9 @@ where )(i) } -/// Parse a function call expression +/// Parse a function call expression. +/// +/// The `name` field of the [`Expr::Call`] variant is guaranteed to be in lowercase. pub(crate) fn call_expression(i: &str) -> ParseResult<&str, Expr> where T: ArithmeticParsers, @@ -427,10 +429,9 @@ where separated_pair( // special case to handle `DISTINCT`, which is allowed as an identifier // in a call expression - map( - alt((unquoted_identifier, keyword("DISTINCT"))), - &str::to_string, - ), + map(alt((unquoted_identifier, keyword("DISTINCT"))), |n| { + n.to_ascii_lowercase() + }), ws0, delimited( char('('), @@ -874,19 +875,19 @@ mod test { // tests. // No arguments - assert_call("FN()", "FN()"); + assert_call("FN()", "fn()"); // Single argument with surrounding whitespace - assert_call("FN ( 1 )", "FN(1)"); + assert_call("FN ( 1 )", "fn(1)"); // Multiple arguments with varying whitespace - assert_call("FN ( 1,2\n,3,\t4 )", "FN(1, 2, 3, 4)"); + assert_call("FN ( 1,2\n,3,\t4 )", "fn(1, 2, 3, 4)"); // Arguments as expressions - assert_call("FN ( 1 + 2, foo, 'bar' )", "FN(1 + 2, foo, 'bar')"); + assert_call("FN ( 1 + 2, foo, 'bar' )", "fn(1 + 2, foo, 'bar')"); // A single regular expression argument - assert_call("FN ( /foo/ )", "FN(/foo/)"); + assert_call("FN ( /foo/ )", "fn(/foo/)"); // Fallible cases diff --git a/influxdb_influxql_parser/src/select.rs b/influxdb_influxql_parser/src/select.rs index a766f73c5a..57e51fc769 100644 --- a/influxdb_influxql_parser/src/select.rs +++ b/influxdb_influxql_parser/src/select.rs @@ -886,7 +886,7 @@ mod test { assert_eq!( got, Field { - expr: call!("COUNT", var_ref!("foo")), + expr: call!("count", var_ref!("foo")), alias: Some("bar".into()) } ); @@ -896,7 +896,7 @@ mod test { assert_eq!( got, Field { - expr: call!("LAST", var_ref!("n.asks")), + expr: call!("last", var_ref!("n.asks")), alias: None } ); @@ -916,7 +916,7 @@ mod test { assert_eq!( got, Field { - expr: call!("COUNT", distinct!("foo")), + expr: call!("count", distinct!("foo")), alias: Some("bar".into()) } ); @@ -926,7 +926,7 @@ mod test { assert_eq!( got, Field { - expr: call!("COUNT", call!("DISTINCT", var_ref!("foo"))), + expr: call!("count", call!("distinct", var_ref!("foo"))), alias: None } ); @@ -966,7 +966,7 @@ mod test { assert_eq!( got, Field { - expr: call!("COUNT", wildcard!()), + expr: call!("count", wildcard!()), alias: None, } ); diff --git a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-3.snap b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-3.snap index ce1ceda1ca..a5d96601cd 100644 --- a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-3.snap +++ b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-3.snap @@ -2,16 +2,16 @@ source: influxdb_influxql_parser/src/visit.rs expression: "visit_statement!(r#\"SELECT COUNT(value) FROM temp\"#)" --- -- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" -- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" -- "pre_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" -- "pre_visit_expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" +- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" +- "pre_visit_select_field: Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" +- "pre_visit_expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" - "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }" - "post_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }" -- "post_visit_expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" -- "post_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" -- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" +- "post_visit_expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" +- "post_visit_select_field: Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" +- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" - "pre_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" - "pre_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "pre_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" @@ -20,6 +20,6 @@ expression: "visit_statement!(r#\"SELECT COUNT(value) FROM temp\"#)" - "post_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" - "post_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "post_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" -- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" diff --git a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-4.snap b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-4.snap index 3bd0f33201..12bca481ae 100644 --- a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-4.snap +++ b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit__test__select_statement-4.snap @@ -2,16 +2,16 @@ source: influxdb_influxql_parser/src/visit.rs expression: "visit_statement!(r#\"SELECT COUNT(DISTINCT value) FROM temp\"#)" --- -- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" -- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" -- "pre_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" -- "pre_visit_expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }" +- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" +- "pre_visit_select_field: Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" +- "pre_visit_expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }" - "pre_visit_expr: Distinct(Identifier(\"value\"))" - "post_visit_expr: Distinct(Identifier(\"value\"))" -- "post_visit_expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }" -- "post_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" -- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" +- "post_visit_expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }" +- "post_visit_select_field: Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" +- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" - "pre_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" - "pre_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "pre_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" @@ -20,6 +20,6 @@ expression: "visit_statement!(r#\"SELECT COUNT(DISTINCT value) FROM temp\"#)" - "post_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" - "post_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "post_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" -- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" diff --git a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-3.snap b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-3.snap index a9b7e9b1fb..82480175ad 100644 --- a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-3.snap +++ b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-3.snap @@ -2,16 +2,16 @@ source: influxdb_influxql_parser/src/visit_mut.rs expression: "visit_statement!(r#\"SELECT COUNT(value) FROM temp\"#)" --- -- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" -- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" -- "pre_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" -- "pre_visit_expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" +- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" +- "pre_visit_select_field: Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" +- "pre_visit_expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" - "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }" - "post_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }" -- "post_visit_expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" -- "post_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" -- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" +- "post_visit_expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }" +- "post_visit_select_field: Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }" +- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }" - "pre_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" - "pre_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "pre_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" @@ -20,6 +20,6 @@ expression: "visit_statement!(r#\"SELECT COUNT(value) FROM temp\"#)" - "post_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" - "post_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "post_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" -- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [VarRef { name: Identifier(\"value\"), data_type: None }] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" diff --git a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-4.snap b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-4.snap index 98531747ed..23783a1cbd 100644 --- a/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-4.snap +++ b/influxdb_influxql_parser/src/snapshots/influxdb_influxql_parser__visit_mut__test__select_statement-4.snap @@ -2,16 +2,16 @@ source: influxdb_influxql_parser/src/visit_mut.rs expression: "visit_statement!(r#\"SELECT COUNT(DISTINCT value) FROM temp\"#)" --- -- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" -- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" -- "pre_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" -- "pre_visit_expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }" +- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" +- "pre_visit_select_field: Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" +- "pre_visit_expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }" - "pre_visit_expr: Distinct(Identifier(\"value\"))" - "post_visit_expr: Distinct(Identifier(\"value\"))" -- "post_visit_expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }" -- "post_visit_select_field: Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" -- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" +- "post_visit_expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }" +- "post_visit_select_field: Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }" +- "post_visit_select_field_list: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }" - "pre_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" - "pre_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "pre_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" @@ -20,6 +20,6 @@ expression: "visit_statement!(r#\"SELECT COUNT(DISTINCT value) FROM temp\"#)" - "post_visit_qualified_measurement_name: QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) }" - "post_visit_select_measurement_selection: Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })" - "post_visit_select_from_clause: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }" -- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" -- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"COUNT\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" +- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }" +- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: Call { name: \"count\", args: [Distinct(Identifier(\"value\"))] }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"temp\")) })] }, condition: None, group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })" diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql index 30a42f17de..8bb474f614 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql @@ -258,4 +258,22 @@ SELECT f64, non_existing, f64 + non_existing FROM m0 WHERE f64 > 19; -- Multiple measurements in the FROM clause -- -SELECT usage_idle, bytes_used FROM cpu, disk; \ No newline at end of file +SELECT usage_idle, bytes_used FROM cpu, disk; + +-- +-- GROUP BY +-- + +-- Validate ordering without GROUP BY +SELECT cpu, usage_idle FROM cpu; + +-- Validate various GROUP BY scenarios +SELECT usage_idle FROM cpu GROUP BY cpu; +SELECT usage_idle, cpu FROM cpu GROUP BY cpu; + +-- multiple measurements and tags in the group by + +SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu; +SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device; +SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu; +SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu; diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected index b4198d805e..c14a6e8574 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected @@ -409,8 +409,117 @@ +------------------+----------------------+------------+------------+ | iox::measurement | time | usage_idle | bytes_used | +------------------+----------------------+------------+------------+ +| cpu | 2022-10-31T02:00:00Z | 2.98 | | | cpu | 2022-10-31T02:00:00Z | 0.98 | | +| cpu | 2022-10-31T02:00:00Z | 1.98 | | +| cpu | 2022-10-31T02:00:10Z | 2.99 | | | cpu | 2022-10-31T02:00:10Z | 0.99 | | +| cpu | 2022-10-31T02:00:10Z | 1.99 | | | disk | 2022-10-31T02:00:00Z | | 219838.0 | +| disk | 2022-10-31T02:00:00Z | | 319838.0 | +| disk | 2022-10-31T02:00:00Z | | 419838.0 | | disk | 2022-10-31T02:00:10Z | | 219833.0 | -+------------------+----------------------+------------+------------+ \ No newline at end of file +| disk | 2022-10-31T02:00:10Z | | 319833.0 | +| disk | 2022-10-31T02:00:10Z | | 419833.0 | ++------------------+----------------------+------------+------------+ +-- InfluxQL: SELECT cpu, usage_idle FROM cpu; ++------------------+----------------------+-----------+------------+ +| iox::measurement | time | cpu | usage_idle | ++------------------+----------------------+-----------+------------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2.98 | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 0.98 | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 1.98 | +| cpu | 2022-10-31T02:00:10Z | cpu-total | 2.99 | +| cpu | 2022-10-31T02:00:10Z | cpu0 | 0.99 | +| cpu | 2022-10-31T02:00:10Z | cpu1 | 1.99 | ++------------------+----------------------+-----------+------------+ +-- InfluxQL: SELECT usage_idle FROM cpu GROUP BY cpu; ++------------------+----------------------+-----------+------------+ +| iox::measurement | time | cpu | usage_idle | ++------------------+----------------------+-----------+------------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2.98 | +| cpu | 2022-10-31T02:00:10Z | cpu-total | 2.99 | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 0.98 | +| cpu | 2022-10-31T02:00:10Z | cpu0 | 0.99 | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 1.98 | +| cpu | 2022-10-31T02:00:10Z | cpu1 | 1.99 | ++------------------+----------------------+-----------+------------+ +-- InfluxQL: SELECT usage_idle, cpu FROM cpu GROUP BY cpu; ++------------------+----------------------+------------+-----------+ +| iox::measurement | time | usage_idle | cpu | ++------------------+----------------------+------------+-----------+ +| cpu | 2022-10-31T02:00:00Z | 2.98 | cpu-total | +| cpu | 2022-10-31T02:00:10Z | 2.99 | cpu-total | +| cpu | 2022-10-31T02:00:00Z | 0.98 | cpu0 | +| cpu | 2022-10-31T02:00:10Z | 0.99 | cpu0 | +| cpu | 2022-10-31T02:00:00Z | 1.98 | cpu1 | +| cpu | 2022-10-31T02:00:10Z | 1.99 | cpu1 | ++------------------+----------------------+------------+-----------+ +-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu; ++------------------+----------------------+-----------+------------+------------+ +| iox::measurement | time | cpu | usage_idle | bytes_free | ++------------------+----------------------+-----------+------------+------------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu-total | 2.99 | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 0.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu0 | 0.99 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 1.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu1 | 1.99 | | +| disk | 2022-10-31T02:00:00Z | | | 1234.0 | +| disk | 2022-10-31T02:00:00Z | | | 2234.0 | +| disk | 2022-10-31T02:00:00Z | | | 3234.0 | +| disk | 2022-10-31T02:00:10Z | | | 1239.0 | +| disk | 2022-10-31T02:00:10Z | | | 2239.0 | +| disk | 2022-10-31T02:00:10Z | | | 3239.0 | ++------------------+----------------------+-----------+------------+------------+ +-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device; ++------------------+----------------------+-----------+---------+------------+------------+ +| iox::measurement | time | cpu | device | usage_idle | bytes_free | ++------------------+----------------------+-----------+---------+------------+------------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu-total | | 2.99 | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | | 0.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234.0 | +| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239.0 | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234.0 | +| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239.0 | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234.0 | +| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239.0 | ++------------------+----------------------+-----------+---------+------------+------------+ +-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu; ++------------------+----------------------+-----------+---------+------------+------------+ +| iox::measurement | time | cpu | device | usage_idle | bytes_free | ++------------------+----------------------+-----------+---------+------------+------------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu-total | | 2.99 | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | | 0.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | | +| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234.0 | +| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239.0 | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234.0 | +| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239.0 | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234.0 | +| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239.0 | ++------------------+----------------------+-----------+---------+------------+------------+ +-- InfluxQL: SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu; ++------------------+----------------------+------------+------------+---------+-----------+ +| iox::measurement | time | usage_idle | bytes_free | device | cpu | ++------------------+----------------------+------------+------------+---------+-----------+ +| cpu | 2022-10-31T02:00:00Z | 2.98 | | | cpu-total | +| cpu | 2022-10-31T02:00:10Z | 2.99 | | | cpu-total | +| cpu | 2022-10-31T02:00:00Z | 0.98 | | | cpu0 | +| cpu | 2022-10-31T02:00:10Z | 0.99 | | | cpu0 | +| cpu | 2022-10-31T02:00:00Z | 1.98 | | | cpu1 | +| cpu | 2022-10-31T02:00:10Z | 1.99 | | | cpu1 | +| disk | 2022-10-31T02:00:00Z | | 1234.0 | disk1s1 | | +| disk | 2022-10-31T02:00:10Z | | 1239.0 | disk1s1 | | +| disk | 2022-10-31T02:00:00Z | | 2234.0 | disk1s2 | | +| disk | 2022-10-31T02:00:10Z | | 2239.0 | disk1s2 | | +| disk | 2022-10-31T02:00:00Z | | 3234.0 | disk1s5 | | +| disk | 2022-10-31T02:00:10Z | | 3239.0 | disk1s5 | | ++------------------+----------------------+------------+------------+---------+-----------+ \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests2/setups.rs b/influxdb_iox/tests/query_tests2/setups.rs index 14bf2ba253..b3b174bdaf 100644 --- a/influxdb_iox/tests/query_tests2/setups.rs +++ b/influxdb_iox/tests/query_tests2/setups.rs @@ -1233,10 +1233,18 @@ pub static SETUPS: Lazy> = Lazy::new(|| { m1,tag0=val00 f64=100.5,i64=1001i,str="hi" 1667181600000000000 m1,tag0=val00 f64=200.6,i64=2001i,str="lo" 1667181610000000000 m1,tag0=val01 f64=101.7,i64=1011i,str="lo" 1667181600000000000 + cpu,host=host1,cpu=cpu-total usage_idle=2.98,usage_system=2.2 1667181600000000000 + cpu,host=host1,cpu=cpu-total usage_idle=2.99,usage_system=2.1 1667181610000000000 cpu,host=host1,cpu=cpu0 usage_idle=0.98,usage_system=0.2 1667181600000000000 cpu,host=host1,cpu=cpu0 usage_idle=0.99,usage_system=0.1 1667181610000000000 + cpu,host=host1,cpu=cpu1 usage_idle=1.98,usage_system=1.2 1667181600000000000 + cpu,host=host1,cpu=cpu1 usage_idle=1.99,usage_system=1.1 1667181610000000000 disk,host=host1,device=disk1s1 bytes_free=1234,bytes_used=219838 1667181600000000000 disk,host=host1,device=disk1s1 bytes_free=1239,bytes_used=219833 1667181610000000000 + disk,host=host1,device=disk1s2 bytes_free=2234,bytes_used=319838 1667181600000000000 + disk,host=host1,device=disk1s2 bytes_free=2239,bytes_used=319833 1667181610000000000 + disk,host=host1,device=disk1s5 bytes_free=3234,bytes_used=419838 1667181600000000000 + disk,host=host1,device=disk1s5 bytes_free=3239,bytes_used=419833 1667181610000000000 "# .to_string(), ), diff --git a/iox_query/Cargo.toml b/iox_query/Cargo.toml index 49c2f2a245..6b2df0d063 100644 --- a/iox_query/Cargo.toml +++ b/iox_query/Cargo.toml @@ -25,6 +25,7 @@ datafusion = { workspace = true } datafusion_util = { path = "../datafusion_util" } executor = { path = "../executor"} futures = "0.3" +generated_types = { path = "../generated_types" } hashbrown = { workspace = true } influxdb_influxql_parser = { path = "../influxdb_influxql_parser" } itertools = "0.10.5" @@ -36,6 +37,7 @@ parquet_file = { path = "../parquet_file" } query_functions = { path = "../query_functions"} regex = "1" schema = { path = "../schema" } +serde_json = "1.0.93" snafu = "0.7" tokio = { version = "1.26", features = ["macros", "parking_lot"] } tokio-stream = "0.1" diff --git a/iox_query/src/frontend/influxql.rs b/iox_query/src/frontend/influxql.rs index e6e0503e9d..5eb70f28fb 100644 --- a/iox_query/src/frontend/influxql.rs +++ b/iox_query/src/frontend/influxql.rs @@ -1,12 +1,20 @@ +use arrow::datatypes::SchemaRef; +use std::any::Any; use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; use crate::exec::context::IOxSessionContext; use crate::plan::influxql; use crate::plan::influxql::{InfluxQLToLogicalPlan, SchemaProvider}; +use datafusion::common::Statistics; use datafusion::datasource::provider_as_source; +use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{LogicalPlan, TableSource}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::{Partitioning, SendableRecordBatchStream}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::ExecutionPlan, @@ -43,6 +51,61 @@ impl SchemaProvider for ContextSchemaProvider { } } +/// A physical operator that overrides the `schema` API, +/// to return an amended version owned by `SchemaExec`. The +/// principal use case is to add additional metadata to the schema. +struct SchemaExec { + input: Arc, + schema: SchemaRef, +} + +impl Debug for SchemaExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SchemaExec") + } +} + +impl ExecutionPlan for SchemaExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + unimplemented!() + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.input.execute(partition, context) + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + /// This struct can create plans for running SQL queries against databases #[derive(Debug, Default)] pub struct InfluxQLQueryPlanner {} @@ -65,9 +128,20 @@ impl InfluxQLQueryPlanner { let statement = self.query_to_statement(query)?; let logical_plan = self.statement_to_plan(statement, &ctx).await?; - // This would only work for SELECT statements at the moment, as the schema queries do - // not return ExecutionPlan - ctx.create_physical_plan(&logical_plan).await + let input = ctx.create_physical_plan(&logical_plan).await?; + + // Merge schema-level metadata from the logical plan with the + // schema from the physical plan, as it is not propagated through the + // physical planning process. + let input_schema = input.schema(); + let mut md = input_schema.metadata().clone(); + md.extend(logical_plan.schema().metadata().clone()); + let schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( + input_schema.fields().clone(), + md, + )); + + Ok(Arc::new(SchemaExec { input, schema })) } async fn statement_to_plan( diff --git a/iox_query/src/plan/influxql/field.rs b/iox_query/src/plan/influxql/field.rs index 41e04ba943..678fc32647 100644 --- a/iox_query/src/plan/influxql/field.rs +++ b/iox_query/src/plan/influxql/field.rs @@ -100,10 +100,10 @@ mod test { assert_eq!(field_name(&f), "usage"); let f = get_first_field("SELECT COUNT(usage) FROM cpu"); - assert_eq!(field_name(&f), "COUNT"); + assert_eq!(field_name(&f), "count"); let f = get_first_field("SELECT COUNT(usage) + SUM(usage_idle) FROM cpu"); - assert_eq!(field_name(&f), "COUNT_usage_SUM_usage_idle"); + assert_eq!(field_name(&f), "count_usage_sum_usage_idle"); let f = get_first_field("SELECT 1+2 FROM cpu"); assert_eq!(field_name(&f), ""); diff --git a/iox_query/src/plan/influxql/planner.rs b/iox_query/src/plan/influxql/planner.rs index 928efc337f..3b465ac256 100644 --- a/iox_query/src/plan/influxql/planner.rs +++ b/iox_query/src/plan/influxql/planner.rs @@ -7,7 +7,7 @@ use crate::plan::influxql::var_ref::{ }; use crate::DataFusionError; use arrow::datatypes::DataType; -use datafusion::common::{Result, ScalarValue, ToDFSchema}; +use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema}; use datafusion::logical_expr::expr_rewriter::{normalize_col, ExprRewritable, ExprRewriter}; use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::logical_plan::Analyze; @@ -16,12 +16,16 @@ use datafusion::logical_expr::{ LogicalPlanBuilder, Operator, PlanType, Projection, TableSource, ToStringifiedPlan, }; use datafusion_util::{lit_dict, AsExpr}; +use generated_types::influxdata::iox::querier::v1::{ + influx_ql_metadata::TagKeyColumn, InfluxQlMetadata, +}; use influxdb_influxql_parser::common::OrderByClause; use influxdb_influxql_parser::explain::{ExplainOption, ExplainStatement}; +use influxdb_influxql_parser::expression::walk::walk_expr; use influxdb_influxql_parser::expression::{ BinaryOperator, ConditionalExpression, ConditionalOperator, VarRefDataType, }; -use influxdb_influxql_parser::select::{SLimitClause, SOffsetClause}; +use influxdb_influxql_parser::select::{Dimension, SLimitClause, SOffsetClause}; use influxdb_influxql_parser::{ common::{LimitClause, MeasurementName, OffsetClause, WhereClause}, expression::Expr as IQLExpr, @@ -33,14 +37,20 @@ use influxdb_influxql_parser::{ use itertools::Itertools; use once_cell::sync::Lazy; use query_functions::clean_non_meta_escapes; -use schema::{InfluxColumnType, InfluxFieldType, Schema}; -use std::collections::{HashSet, VecDeque}; +use schema::{ + InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME, + INFLUXQL_METADATA_KEY, +}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::iter; -use std::ops::Deref; +use std::ops::{ControlFlow, Deref}; use std::str::FromStr; use std::sync::Arc; +/// The column index of the measurement column. +const MEASUREMENT_COLUMN_INDEX: u32 = 0; + /// The `SchemaProvider` trait allows the InfluxQL query planner to obtain /// meta-data about tables referenced in InfluxQL statements. pub trait SchemaProvider { @@ -159,28 +169,176 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fn select_statement_to_plan(&self, select: &SelectStatement) -> Result { let mut plans = self.plan_from_tables(&select.from)?; + // Aggregate functions are currently not supported. + // + // See: https://github.com/influxdata/influxdb_iox/issues/6919 + if has_aggregate_exprs(&select.fields) { + return Err(DataFusionError::NotImplemented( + "aggregate functions".to_owned(), + )); + } + + let mut meta = InfluxQlMetadata { + measurement_column_index: MEASUREMENT_COLUMN_INDEX, + tag_key_columns: Vec::new(), + }; + + // The `time` column is always present in the result set + let mut fields = if !has_time_column(&select.fields) { + vec![Field { + expr: IQLExpr::VarRef { + name: "time".into(), + data_type: Some(VarRefDataType::Timestamp), + }, + alias: None, + }] + } else { + vec![] + }; + + let (group_by_tag_set, projection_tag_set) = if let Some(group_by) = &select.group_by { + let mut tag_columns = find_tag_columns::>(&select.fields); + + // Contains the list of tag keys specified in the `GROUP BY` clause + let (tag_set, is_projected): (Vec<_>, Vec<_>) = group_by + .iter() + .map(|dimension| match dimension { + Dimension::Tag(t) => { + Ok((t.deref().as_str(), tag_columns.contains(t.deref().as_str()))) + } + // TODO(sgc): https://github.com/influxdata/influxdb_iox/issues/6915 + Dimension::Time { .. } => { + Err(DataFusionError::NotImplemented("GROUP BY time".to_owned())) + } + // Inconsistent state, as these variants should have been expanded by `rewrite_select_statement` + Dimension::Regex(_) | Dimension::Wildcard => Err(DataFusionError::Internal( + "unexpected regular expression or wildcard found in GROUP BY".into(), + )), + }) + .collect::>>()? + .into_iter() + // We sort the tag set, to ensure correct ordering of the results. The tag columns + // referenced in the `tag_set` variable are added to the sort operator in + // lexicographically ascending order. + .sorted_by(|a, b| a.0.cmp(b.0)) + .unzip(); + + // Tags specified in the `GROUP BY` clause that are not already added to the + // projection must be projected, so they key be used in the group key. + // + // At the end of the loop, the `tag_columns` set will contain the tag columns that + // exist in the projection and not in the `GROUP BY`. + for col in &tag_set { + if tag_columns.remove(*col) { + continue; + } + + fields.push(Field { + expr: IQLExpr::VarRef { + name: (*col).into(), + data_type: Some(VarRefDataType::Tag), + }, + alias: Some((*col).into()), + }); + } + + // Add the remaining columns to be projected + fields.extend(select.fields.iter().cloned()); + + /// There is always a [INFLUXQL_MEASUREMENT_COLUMN_NAME] column projected in the LogicalPlan, + /// therefore the start index is 1 for determining the offsets of the + /// tag key columns in the column projection list. + const START_INDEX: usize = 1; + + // Create a map of tag key columns to their respective index in the projection + let index_map = fields + .iter() + .enumerate() + .filter_map(|(index, f)| match &f.expr { + IQLExpr::VarRef { + name, + data_type: Some(VarRefDataType::Tag), + } => Some((name.deref().as_str(), index + START_INDEX)), + _ => None, + }) + .collect::>(); + + // tag_set was previously sorted, so tag_key_columns will be in the correct order + meta.tag_key_columns = tag_set + .iter() + .zip(is_projected) + .map(|(tag_key, is_projected)| TagKeyColumn { + tag_key: (*tag_key).to_owned(), + column_index: *index_map.get(*tag_key).unwrap() as u32, + is_projected, + }) + .collect(); + + ( + tag_set, + tag_columns.into_iter().sorted().collect::>(), + ) + } else { + let mut tag_columns = find_tag_columns::>(&select.fields); + tag_columns.sort(); + // Add the remaining columns to be projected + fields.extend(select.fields.iter().cloned()); + (vec![], tag_columns) + }; + let Some(plan) = plans.pop_front() else { return LogicalPlanBuilder::empty(false).build(); }; - let plan = self.project_select(plan, select)?; + let plan = self.project_select(plan, select, &fields)?; // If there are multiple measurements, we need to sort by the measurement column // NOTE: Ideally DataFusion would maintain the order of the UNION ALL, which would eliminate // the need to sort by measurement. // See: https://github.com/influxdata/influxdb_iox/issues/7062 let mut series_sort = if !plans.is_empty() { - vec![Expr::sort("iox::measurement".as_expr(), true, false)] + vec![Expr::sort( + INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr(), + true, + false, + )] } else { vec![] }; // UNION the remaining plans let plan = plans.into_iter().try_fold(plan, |prev, next| { - let next = self.project_select(next, select)?; + let next = self.project_select(next, select, &fields)?; LogicalPlanBuilder::from(prev).union(next)?.build() })?; - let plan = if select.group_by.is_none() { - // Generate the following sort: - // iox::measurement, time, [projected tags, sorted lexicographically] + let plan = plan_with_metadata(plan, &meta)?; + + // Construct the sort logical operator + // + // The ordering of the results is as follows: + // + // iox::measurement, [group by tag 0, .., group by tag n], time, [projection tag 0, .., projection tag n] + // + // NOTE: + // + // Sort expressions referring to tag keys are always specified in lexicographically ascending order. + let plan = { + if !group_by_tag_set.is_empty() { + // Adding `LIMIT` or `OFFSET` with a `GROUP BY tag, ...` clause is not supported + // + // See: https://github.com/influxdata/influxdb_iox/issues/6920 + if !group_by_tag_set.is_empty() + && (select.offset.is_some() || select.limit.is_some()) + { + return Err(DataFusionError::NotImplemented( + "GROUP BY combined with LIMIT or OFFSET clause".to_owned(), + )); + } + + series_sort.extend( + group_by_tag_set + .into_iter() + .map(|f| Expr::sort(f.as_expr(), true, false)), + ); + }; series_sort.push(Expr::sort( "time".as_expr(), @@ -192,31 +350,15 @@ impl<'a> InfluxQLToLogicalPlan<'a> { false, )); - series_sort.extend( - select - .fields - .iter() - .filter_map(|f| { - if let IQLExpr::VarRef { - name, - data_type: Some(VarRefDataType::Tag), - } = &f.expr - { - Some(name.deref()) - } else { - None - } - }) - // the tags must be sorted lexicographically in ascending order to match - // the ordering in InfluxQL - .sorted() - .map(|n| Expr::sort(n.as_expr(), true, false)), - ); + if !projection_tag_set.is_empty() { + series_sort.extend( + projection_tag_set + .into_iter() + .map(|f| Expr::sort(f.as_expr(), true, false)), + ); + } + LogicalPlanBuilder::from(plan).sort(series_sort)?.build() - } else { - Err(DataFusionError::NotImplemented( - "GROUP BY not supported".into(), - )) }?; let plan = self.limit(plan, select.offset, select.limit)?; @@ -226,7 +368,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Ok(plan) } - fn project_select(&self, plan: LogicalPlan, select: &SelectStatement) -> Result { + fn project_select( + &self, + plan: LogicalPlan, + select: &SelectStatement, + fields: &[Field], + ) -> Result { let (proj, plan) = match plan { LogicalPlan::Projection(Projection { expr, input, .. }) => { (expr, input.deref().clone()) @@ -241,7 +388,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let plan = self.plan_where_clause(&select.condition, plan, &schemas, tz)?; // Process and validate the field expressions in the SELECT projection list - let select_exprs = self.field_list_to_exprs(&plan, &select.fields, &schemas)?; + let select_exprs = self.field_list_to_exprs(&plan, fields, &schemas)?; // Wrap the plan in a `LogicalPlan::Projection` from the select expressions project(plan, proj.into_iter().chain(select_exprs.into_iter())) @@ -290,25 +437,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fn field_list_to_exprs( &self, plan: &LogicalPlan, - fields: &FieldList, + fields: &[Field], schemas: &Schemas, ) -> Result> { - // InfluxQL requires the time column is present in the projection list. - let extra = if !has_time_column(fields) { - vec![Field { - expr: IQLExpr::VarRef { - name: "time".into(), - data_type: Some(VarRefDataType::Timestamp), - }, - alias: None, - }] - } else { - vec![] - }; - - extra + fields .iter() - .chain(fields.iter()) .map(|field| self.field_to_df_expr(field, plan, schemas)) .collect() } @@ -597,7 +730,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Ok(if let Ok(source) = self.s.get_table_provider(&table_name) { Some(project( LogicalPlanBuilder::scan(&table_name, source, None)?.build()?, - iter::once(lit_dict(&table_name).alias("iox::measurement")), + iter::once(lit_dict(&table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)), )?) } else { None @@ -605,6 +738,74 @@ impl<'a> InfluxQLToLogicalPlan<'a> { } } +/// Adds [`InfluxQlMetadata`] to the `plan`. +/// +/// **Note** +/// +/// The metadata does not propagate over RPC requests due to issue [#3779]. +/// +/// [#3779]: https://github.com/apache/arrow-rs/issues/3779 +fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result { + fn make_schema(schema: DFSchemaRef, metadata: &InfluxQlMetadata) -> Result { + let data = serde_json::to_string(metadata).map_err(|err| { + DataFusionError::Internal(format!("error serializing InfluxQL metadata: {err}")) + })?; + + let mut md = schema.metadata().clone(); + md.insert(INFLUXQL_METADATA_KEY.to_owned(), data); + + Ok(Arc::new(DFSchema::new_with_metadata( + schema.fields().clone(), + md, + )?)) + } + + Ok(match plan { + LogicalPlan::Projection(mut p) => { + p.schema = make_schema(p.schema, metadata)?; + LogicalPlan::Projection(p) + } + LogicalPlan::Union(mut u) => { + u.schema = make_schema(u.schema, metadata)?; + LogicalPlan::Union(u) + } + _ => { + return Err(DataFusionError::Internal( + "unexpected LogicalPlan".to_owned(), + )) + } + }) +} + +/// Returns `true` if any expressions refer to an aggregate function. +fn has_aggregate_exprs(fields: &FieldList) -> bool { + fields.iter().any(|f| { + walk_expr(&f.expr, &mut |e| match e { + IQLExpr::Call { name, .. } if is_aggregate_function(name) => ControlFlow::Break(()), + _ => ControlFlow::Continue(()), + }) + .is_break() + }) +} + +/// Find all the tag columns projected in the `SELECT` from the field list. +fn find_tag_columns<'a, T: FromIterator<&'a str>>(fields: &'a FieldList) -> T { + fields + .iter() + .filter_map(|f| { + if let IQLExpr::VarRef { + name, + data_type: Some(VarRefDataType::Tag), + } = &f.expr + { + Some(name.deref().as_str()) + } else { + None + } + }) + .collect() +} + /// Perform a series of passes to rewrite `expr` in compliance with InfluxQL behavior /// in an effort to ensure the query executes without error. fn rewrite_conditional_expr(expr: Expr, schemas: &Schemas) -> Result { @@ -708,7 +909,7 @@ fn normalize_identifier(ident: &Identifier) -> String { /// > /// > To match InfluxQL, the `time` column must not exist as part of a /// > complex expression. -fn has_time_column(fields: &FieldList) -> bool { +fn has_time_column(fields: &[Field]) -> bool { fields .iter() .any(|f| matches!(&f.expr, IQLExpr::VarRef { name, .. } if name.deref() == "time")) @@ -727,6 +928,72 @@ fn is_scalar_math_function(name: &str) -> bool { SCALAR_MATH_FUNCTIONS.contains(name) } +/// A list of valid aggregate and aggregate-like functions supported by InfluxQL. +/// +/// A full list is available via the [InfluxQL documentation][docs]. +/// +/// > **Note** +/// > +/// > These are not necessarily implemented, and are tracked by the following +/// > issues: +/// > +/// > * +/// > * +/// > * +/// > * +/// > * +/// +/// [docs]: https://docs.influxdata.com/influxdb/v1.8/query_language/functions/ +static AGGREGATE_FUNCTIONS: Lazy> = Lazy::new(|| { + HashSet::from([ + // Scalar-like functions + "cumulative_sum", + "derivative", + "difference", + "elapsed", + "moving_average", + "non_negative_derivative", + "non_negative_difference", + // Selector functions + "bottom", + "first", + "last", + "max", + "min", + "percentile", + "sample", + "top", + // Aggregate functions + "count", + "count", + "integral", + "mean", + "median", + "mode", + "spread", + "stddev", + "sum", + // Prediction functions + "holt_winters", + "holt_winters_with_fit", + // Technical analysis functions + "chande_momentum_oscillator", + "exponential_moving_average", + "double_exponential_moving_average", + "kaufmans_efficiency_ratio", + "kaufmans_adaptive_moving_average", + "triple_exponential_moving_average", + "triple_exponential_derivative", + "relative_strength_index", + ]) +}); + +/// Returns `true` if `name` is an aggregate or aggregate function +/// supported by InfluxQL. +fn is_aggregate_function(name: &str) -> bool { + AGGREGATE_FUNCTIONS.contains(name) +} + /// Returns true if the conditional expression is a single node that /// refers to the `time` column. /// @@ -755,12 +1022,12 @@ mod test { use super::*; use crate::exec::Executor; use crate::plan::influxql::test_utils; - use crate::plan::influxql::test_utils::TestDatabaseAdapter; + use crate::plan::influxql::test_utils::{parse_select, TestDatabaseAdapter}; use crate::test::{TestChunk, TestDatabase}; use influxdb_influxql_parser::parse_statements; use insta::assert_snapshot; - fn plan(sql: &str) -> String { + fn logical_plan(sql: &str) -> Result { let mut statements = parse_statements(sql).unwrap(); // index of columns in the above chunk: [bar, foo, i64_field, i64_field_2, time] let executor = Arc::new(Executor::new_testing()); @@ -813,7 +1080,21 @@ mod test { let planner = InfluxQLToLogicalPlan::new(&sp); - match planner.statement_to_plan(statements.pop().unwrap()) { + planner.statement_to_plan(statements.pop().unwrap()) + } + + fn metadata(sql: &str) -> Option { + logical_plan(sql) + .unwrap() + .schema() + .metadata() + .get(INFLUXQL_METADATA_KEY) + .map(|s| serde_json::from_str(s).unwrap()) + } + + fn plan(sql: &str) -> String { + let result = logical_plan(sql); + match result { Ok(res) => res.display_indent_schema().to_string(), Err(err) => err.to_string(), } @@ -840,6 +1121,80 @@ mod test { mod select { use super::*; + /// Validate the metadata is correctly encoded in the schema. + /// + /// Properties that are tested: + /// + /// * only tag keys listed in a `GROUP BY` clause are included in the `tag_key_columns` vector + /// * `tag_key_columns` is order by `tag_key` + #[test] + fn test_metadata_in_schema() { + macro_rules! assert_tag_keys { + ($MD:expr $(,($KEY:literal, $VAL:literal, $PROJ:literal))+) => { + assert_eq!( + $MD.tag_key_columns.clone().into_iter().map(|v| (v.tag_key, v.column_index, v.is_projected)).collect::>(), + vec![$(($KEY.to_owned(), $VAL, $PROJ),)*], + "tag keys don't match" + ); + + let keys = $MD.tag_key_columns.into_iter().map(|v| v.tag_key).collect::>(); + let mut sorted = keys.clone(); + sorted.sort_unstable(); + assert_eq!(keys, sorted, "tag keys are not sorted"); + }; + } + + // validate metadata is empty when there is no group by + let md = metadata("SELECT free FROM disk").unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert!(md.tag_key_columns.is_empty()); + let md = metadata("SELECT free FROM disk, cpu").unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert!(md.tag_key_columns.is_empty()); + + let md = metadata("SELECT free FROM disk GROUP BY device").unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert_tag_keys!(md, ("device", 2, false)); + + // validate tag in projection is not included in metadata + let md = + metadata("SELECT cpu, usage_idle, free FROM cpu, disk GROUP BY device").unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert_tag_keys!(md, ("device", 2, false)); + + // validate multiple tags from different measurements + let md = + metadata("SELECT usage_idle, free FROM cpu, disk GROUP BY cpu, device").unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert_tag_keys!(md, ("cpu", 2, false), ("device", 3, false)); + + // validate multiple tags from different measurements, and key order is maintained + let md = + metadata("SELECT usage_idle, free FROM cpu, disk GROUP BY device, cpu").unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert_tag_keys!(md, ("cpu", 2, false), ("device", 3, false)); + + // validate that with cpu tag explicitly listed in project, tag-key order is maintained and column index + // is valid + let md = metadata("SELECT usage_idle, free, cpu FROM cpu, disk GROUP BY cpu, device") + .unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert_tag_keys!(md, ("cpu", 5, true), ("device", 2, false)); + + // validate region tag, shared by both measurements, is still correctly handled + let md = metadata( + "SELECT region, usage_idle, free, cpu FROM cpu, disk GROUP BY region, cpu, device", + ) + .unwrap(); + assert_eq!(md.measurement_column_index, 0); + assert_tag_keys!( + md, + ("cpu", 6, true), + ("device", 2, false), + ("region", 3, true) + ); + } + /// Verify the behaviour of the `FROM` clause when selecting from zero to many measurements. #[test] fn test_from_zero_to_many() { @@ -1190,6 +1545,16 @@ mod test { } } + /// Tests to validate InfluxQL `SELECT` statements that utilise aggregate functions. + mod select_aggregate { + use super::*; + + #[test] + fn test_aggregates_are_not_yet_supported() { + assert_snapshot!(plan("SELECT count(f64_field) FROM data"), @"This feature is not implemented: aggregate functions"); + } + } + /// Tests to validate InfluxQL `SELECT` statements that project columns without specifying /// aggregates or `GROUP BY time()` with gap filling. mod select_raw { @@ -1266,6 +1631,105 @@ mod test { "###); } + #[test] + fn test_select_single_measurement_group_by() { + // Sort should be cpu, time + assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu"), @r###" + Sort: cpu ASC NULLS LAST, cpu.time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + // Sort should be cpu, time + assert_snapshot!(plan("SELECT cpu, usage_idle FROM cpu GROUP BY cpu"), @r###" + Sort: cpu ASC NULLS LAST, cpu.time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + // Sort should be cpu, region, time + assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu, region"), @r###" + Sort: cpu ASC NULLS LAST, region ASC NULLS LAST, cpu.time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, cpu.cpu AS cpu, cpu.region AS region, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + // Sort should be cpu, region, time + assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY region, cpu"), @r###" + Sort: cpu ASC NULLS LAST, region ASC NULLS LAST, cpu.time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, cpu.cpu AS cpu, cpu.region AS region, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + // Sort should be cpu, time, region + assert_snapshot!(plan("SELECT region, usage_idle FROM cpu GROUP BY cpu"), @r###" + Sort: cpu ASC NULLS LAST, cpu.time ASC NULLS LAST, region ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, cpu.cpu AS cpu, cpu.region AS region, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + } + + #[test] + fn test_select_multiple_measurements_group_by() { + // Sort should be iox::measurement, cpu, time + assert_snapshot!(plan("SELECT usage_idle, free FROM cpu, disk GROUP BY cpu"), @r###" + Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + "###); + + // Sort should be iox::measurement, cpu, device, time + assert_snapshot!(plan("SELECT usage_idle, free FROM cpu, disk GROUP BY device, cpu"), @r###" + Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + "###); + + // Sort should be iox::measurement, cpu, time, device + assert_snapshot!(plan("SELECT device, usage_idle, free FROM cpu, disk GROUP BY cpu"), @r###" + Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, time ASC NULLS LAST, device ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + "###); + + // Sort should be iox::measurement, cpu, device, time + assert_snapshot!(plan("SELECT cpu, usage_idle, free FROM cpu, disk GROUP BY cpu, device"), @r###" + Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + "###); + + // Sort should be iox::measurement, device, time, cpu + assert_snapshot!(plan("SELECT cpu, usage_idle, free FROM cpu, disk GROUP BY device"), @r###" + Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, NULL AS free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, free:Null;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + "###); + } + + #[test] + fn test_select_group_by_limit_offset() { + // Should return internal error + assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu LIMIT 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause"); + assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu OFFSET 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause"); + assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu LIMIT 1 OFFSET 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause"); + } + // The following is an outline of additional scenarios to develop // as the planner learns more features. // This is not an exhaustive list and is expected to grow as the @@ -1395,4 +1859,34 @@ mod test { assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data")); } } + + #[test] + fn test_has_aggregate_exprs() { + let sel = parse_select("SELECT count(usage) FROM cpu"); + assert!(has_aggregate_exprs(&sel.fields)); + + // Can be part of a complex expression + let sel = parse_select("SELECT sum(usage) + count(usage) FROM cpu"); + assert!(has_aggregate_exprs(&sel.fields)); + + // Can be mixed with scalar columns + let sel = parse_select("SELECT idle, first(usage) FROM cpu"); + assert!(has_aggregate_exprs(&sel.fields)); + + // Are case insensitive + let sel = parse_select("SELECT Count(usage) FROM cpu"); + assert!(has_aggregate_exprs(&sel.fields)); + + // Returns false where it is not a valid aggregate function + let sel = parse_select("SELECT foo(usage) FROM cpu"); + assert!(!has_aggregate_exprs(&sel.fields)); + + // Returns false when it is a math function + let sel = parse_select("SELECT abs(usage) FROM cpu"); + assert!(!has_aggregate_exprs(&sel.fields)); + + // Returns false when there are only scalar functions + let sel = parse_select("SELECT usage, idle FROM cpu"); + assert!(!has_aggregate_exprs(&sel.fields)); + } } diff --git a/iox_query/src/plan/influxql/rewriter.rs b/iox_query/src/plan/influxql/rewriter.rs index f595a5cb13..bd25cace33 100644 --- a/iox_query/src/plan/influxql/rewriter.rs +++ b/iox_query/src/plan/influxql/rewriter.rs @@ -691,7 +691,7 @@ mod test { let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_i64::integer) AS COUNT FROM temp_01" + "SELECT count(field_i64::integer) AS count FROM temp_01" ); // Duplicate aggregate columns @@ -699,14 +699,14 @@ mod test { let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_i64::integer) AS COUNT, COUNT(field_i64::integer) AS COUNT_1 FROM temp_01" + "SELECT count(field_i64::integer) AS count, count(field_i64::integer) AS count_1 FROM temp_01" ); let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01"); let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_f64::float) AS COUNT FROM temp_01" + "SELECT count(field_f64::float) AS count FROM temp_01" ); // Expands all fields @@ -714,7 +714,7 @@ mod test { let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64, COUNT(field_str::string) AS COUNT_field_str, COUNT(field_u64::unsigned) AS COUNT_field_u64, COUNT(shared_field0::float) AS COUNT_shared_field0 FROM temp_01" + "SELECT count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_str::string) AS count_field_str, count(field_u64::unsigned) AS count_field_u64, count(shared_field0::float) AS count_shared_field0 FROM temp_01" ); // Expands matching fields @@ -722,7 +722,7 @@ mod test { let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), - "SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64, COUNT(field_u64::unsigned) AS COUNT_field_u64 FROM temp_01" + "SELECT count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_u64::unsigned) AS count_field_u64 FROM temp_01" ); // Expands only numeric fields @@ -730,7 +730,7 @@ mod test { let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), - "SELECT SUM(field_f64::float) AS SUM_field_f64, SUM(field_i64::integer) AS SUM_field_i64, SUM(field_u64::unsigned) AS SUM_field_u64, SUM(shared_field0::float) AS SUM_shared_field0 FROM temp_01" + "SELECT sum(field_f64::float) AS sum_field_f64, sum(field_i64::integer) AS sum_field_i64, sum(field_u64::unsigned) AS sum_field_u64, sum(shared_field0::float) AS sum_shared_field0 FROM temp_01" ); let stmt = parse_select("SELECT * FROM merge_00, merge_01"); @@ -760,7 +760,7 @@ mod test { let err = rewrite_statement(&namespace, &stmt).unwrap_err(); assert_eq!( err.to_string(), - "External error: unable to use tag as wildcard in COUNT()" + "External error: unable to use tag as wildcard in count()" ); } diff --git a/schema/src/lib.rs b/schema/src/lib.rs index 26f270dbf6..6dd3fa284d 100644 --- a/schema/src/lib.rs +++ b/schema/src/lib.rs @@ -20,6 +20,11 @@ use snafu::{OptionExt, Snafu}; /// The name of the timestamp column in the InfluxDB datamodel pub const TIME_COLUMN_NAME: &str = "time"; +/// The name of the column specifying the source measurement for a row for an InfluxQL query. +pub const INFLUXQL_MEASUREMENT_COLUMN_NAME: &str = "iox::measurement"; +/// The key identifying the schema-level metadata. +pub const INFLUXQL_METADATA_KEY: &str = "iox::influxql::group_key::metadata"; + /// The Timezone to use for InfluxDB timezone (should be a constant) #[allow(non_snake_case)] pub fn TIME_DATA_TIMEZONE() -> Option { diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 13a6243976..5f95a5e6ce 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -742,7 +742,7 @@ struct IOxFlightDataEncoderBuilder { impl IOxFlightDataEncoderBuilder { fn new(schema: SchemaRef) -> Self { Self { - inner: FlightDataEncoderBuilder::new(), + inner: FlightDataEncoderBuilder::new().with_schema(Arc::clone(&schema)), schema: prepare_schema_for_flight(schema), } } @@ -819,7 +819,7 @@ fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef { }) .collect(); - Arc::new(Schema::new(fields)) + Arc::new(Schema::new(fields).with_metadata(schema.metadata().clone())) } impl Stream for GetStream {