Merge pull request #7009 from influxdata/js-5750-wildcard-support-for-aggregates
Support wildcards in aggregate functionspull/7086/head
commit
94e07158d5
|
@ -43,6 +43,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
|
|||
- [#7046](https://github.com/influxdata/influxdb/pull/7046): Add tsm file export to influx_inspect tool.
|
||||
- [#7011](https://github.com/influxdata/influxdb/issues/7011): Create man pages for commands.
|
||||
- [#7050](https://github.com/influxdata/influxdb/pull/7050): Update go package library dependencies.
|
||||
- [#5750](https://github.com/influxdata/influxdb/issues/5750): Support wildcards in aggregate functions.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -876,8 +876,13 @@ func TestServer_Query_Count(t *testing.T) {
|
|||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "selecting count(*) should error",
|
||||
name: "selecting count(*) should expand the wildcard",
|
||||
command: `SELECT count(*) FROM db0.rp0.cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count_value"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "selecting count(2) should error",
|
||||
command: `SELECT count(2) FROM db0.rp0.cpu`,
|
||||
exp: `{"error":"error parsing query: expected field argument in count()"}`,
|
||||
},
|
||||
}...)
|
||||
|
|
|
@ -1134,6 +1134,68 @@ func (s *SelectStatement) RewriteFields(ic IteratorCreator) (*SelectStatement, e
|
|||
}
|
||||
rwFields = append(rwFields, &Field{Expr: &VarRef{Val: ref.Val, Type: ref.Type}})
|
||||
}
|
||||
case *Call:
|
||||
// Clone a template that we can modify and use for new fields.
|
||||
template := CloneExpr(expr).(*Call)
|
||||
|
||||
// Search for the call with a wildcard by continuously descending until
|
||||
// we no longer have a call.
|
||||
call := template
|
||||
for len(call.Args) > 0 {
|
||||
arg, ok := call.Args[0].(*Call)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
call = arg
|
||||
}
|
||||
|
||||
// Check if this field value is a wildcard.
|
||||
if len(call.Args) == 0 {
|
||||
rwFields = append(rwFields, f)
|
||||
continue
|
||||
}
|
||||
|
||||
wc, ok := call.Args[0].(*Wildcard)
|
||||
if ok && wc.Type == TAG {
|
||||
return s, fmt.Errorf("unable to use tag wildcard in %s()", call.Name)
|
||||
} else if !ok {
|
||||
rwFields = append(rwFields, f)
|
||||
continue
|
||||
}
|
||||
|
||||
// All types that can expand wildcards support float and integer.
|
||||
supportedTypes := map[DataType]struct{}{
|
||||
Float: struct{}{},
|
||||
Integer: struct{}{},
|
||||
}
|
||||
|
||||
// Add additional types for certain functions.
|
||||
switch call.Name {
|
||||
case "count", "first", "last", "distinct", "elapsed":
|
||||
supportedTypes[String] = struct{}{}
|
||||
supportedTypes[Boolean] = struct{}{}
|
||||
case "stddev":
|
||||
supportedTypes[String] = struct{}{}
|
||||
case "min", "max":
|
||||
supportedTypes[Boolean] = struct{}{}
|
||||
}
|
||||
|
||||
for _, ref := range fields {
|
||||
// Do not expand tags within a function call. It likely won't do anything
|
||||
// anyway and will be the wrong thing in 99% of cases.
|
||||
if ref.Type == Tag {
|
||||
continue
|
||||
} else if _, ok := supportedTypes[ref.Type]; !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Make a new expression and replace the wildcard within this cloned expression.
|
||||
call.Args[0] = &VarRef{Val: ref.Val, Type: ref.Type}
|
||||
rwFields = append(rwFields, &Field{
|
||||
Expr: CloneExpr(template),
|
||||
Alias: fmt.Sprintf("%s_%s", f.Name(), ref.Val),
|
||||
})
|
||||
}
|
||||
default:
|
||||
rwFields = append(rwFields, f)
|
||||
}
|
||||
|
@ -1357,15 +1419,17 @@ func (s *SelectStatement) HasWildcard() bool {
|
|||
}
|
||||
|
||||
// HasFieldWildcard returns whether or not the select statement has at least 1 wildcard in the fields
|
||||
func (s *SelectStatement) HasFieldWildcard() bool {
|
||||
for _, f := range s.Fields {
|
||||
_, ok := f.Expr.(*Wildcard)
|
||||
if ok {
|
||||
return true
|
||||
func (s *SelectStatement) HasFieldWildcard() (hasWildcard bool) {
|
||||
WalkFunc(s.Fields, func(n Node) {
|
||||
if hasWildcard {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
_, ok := n.(*Wildcard)
|
||||
if ok {
|
||||
hasWildcard = true
|
||||
}
|
||||
})
|
||||
return hasWildcard
|
||||
}
|
||||
|
||||
// HasDimensionWildcard returns whether or not the select statement has
|
||||
|
@ -1622,7 +1686,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
|||
}
|
||||
|
||||
switch fc := c.Args[0].(type) {
|
||||
case *VarRef:
|
||||
case *VarRef, *Wildcard:
|
||||
// do nothing
|
||||
case *Call:
|
||||
if fc.Name != "distinct" || expr.Name != "count" {
|
||||
|
@ -1686,7 +1750,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
|||
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
|
||||
}
|
||||
switch fc := expr.Args[0].(type) {
|
||||
case *VarRef:
|
||||
case *VarRef, *Wildcard:
|
||||
// do nothing
|
||||
case *Call:
|
||||
if fc.Name != "distinct" || expr.Name != "count" {
|
||||
|
@ -3470,7 +3534,7 @@ func CloneExpr(expr Expr) Expr {
|
|||
case *VarRef:
|
||||
return &VarRef{Val: expr.Val, Type: expr.Type}
|
||||
case *Wildcard:
|
||||
return &Wildcard{}
|
||||
return &Wildcard{Type: expr.Type}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
|
|
@ -428,6 +428,39 @@ func TestSelectStatement_RewriteFields(t *testing.T) {
|
|||
stmt: `SELECT * FROM cpu GROUP BY *`,
|
||||
rewrite: `SELECT value1::float, value2::integer FROM cpu GROUP BY host, region`,
|
||||
},
|
||||
|
||||
// Wildcard function with all fields.
|
||||
{
|
||||
stmt: `SELECT mean(*) FROM cpu`,
|
||||
rewrite: `SELECT mean(value1::float) AS mean_value1, mean(value2::integer) AS mean_value2 FROM cpu`,
|
||||
},
|
||||
|
||||
{
|
||||
stmt: `SELECT distinct(*) FROM strings`,
|
||||
rewrite: `SELECT distinct(string::string) AS distinct_string, distinct(value::float) AS distinct_value FROM strings`,
|
||||
},
|
||||
|
||||
{
|
||||
stmt: `SELECT distinct(*) FROM bools`,
|
||||
rewrite: `SELECT distinct(bool::boolean) AS distinct_bool, distinct(value::float) AS distinct_value FROM bools`,
|
||||
},
|
||||
|
||||
// Wildcard function with some fields excluded.
|
||||
{
|
||||
stmt: `SELECT mean(*) FROM strings`,
|
||||
rewrite: `SELECT mean(value::float) AS mean_value FROM strings`,
|
||||
},
|
||||
|
||||
{
|
||||
stmt: `SELECT mean(*) FROM bools`,
|
||||
rewrite: `SELECT mean(value::float) AS mean_value FROM bools`,
|
||||
},
|
||||
|
||||
// Wildcard function with an alias.
|
||||
{
|
||||
stmt: `SELECT mean(*) AS alias FROM cpu`,
|
||||
rewrite: `SELECT mean(value1::float) AS alias_value1, mean(value2::integer) AS alias_value2 FROM cpu`,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
|
@ -439,7 +472,24 @@ func TestSelectStatement_RewriteFields(t *testing.T) {
|
|||
|
||||
var ic IteratorCreator
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
fields = map[string]influxql.DataType{"value1": influxql.Float, "value2": influxql.Integer}
|
||||
source := sources[0].(*influxql.Measurement)
|
||||
switch source.Name {
|
||||
case "cpu":
|
||||
fields = map[string]influxql.DataType{
|
||||
"value1": influxql.Float,
|
||||
"value2": influxql.Integer,
|
||||
}
|
||||
case "strings":
|
||||
fields = map[string]influxql.DataType{
|
||||
"value": influxql.Float,
|
||||
"string": influxql.String,
|
||||
}
|
||||
case "bools":
|
||||
fields = map[string]influxql.DataType{
|
||||
"value": influxql.Float,
|
||||
"bool": influxql.Boolean,
|
||||
}
|
||||
}
|
||||
dimensions = map[string]struct{}{"host": struct{}{}, "region": struct{}{}}
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue