fix: upgrade influxql to latest version & fix predicate handling for show tag values metaqueries (#22500)

* feat: Add WITH KEY to show tag keys

* fix: add tests for multi measurement tag value queries

* chore: fix linter problems

* chore: revert influxql changes to keep WITH KEY disabled

* chore: add TODO for moving flux tests to flux repo

Co-authored-by: Sam Arnold <sarnold@influxdata.com>
pull/22509/head
William Baker 2021-09-17 11:14:03 -06:00 committed by GitHub
parent 33afff378c
commit 1f66b3110e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 240 additions and 384 deletions

2
go.mod
View File

@ -108,7 +108,7 @@ require (
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210813175002-13799e7662c0
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 // indirect
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/influxdata/pkg-config v0.2.8
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b // indirect

7
go.sum
View File

@ -234,6 +234,7 @@ github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
@ -367,8 +368,8 @@ github.com/influxdata/influx-cli/v2 v2.1.1-0.20210813175002-13799e7662c0 h1:llPY
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210813175002-13799e7662c0/go.mod h1:3KoUqKdsfmm7CREOuWnbYJZbl6j2akSdQUaLctE42so=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93 h1:4t/8PcmLnI2vrcaHcEKeeLsGxC0WMRaOQdPX9b7DF8Y=
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803 h1:LpaVAM5Www2R7M0GJAxAdL3swBvmna8Pyzw6F7o+j04=
@ -404,6 +405,7 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C
github.com/kevinburke/go-bindata v3.22.0+incompatible h1:/JmqEhIWQ7GRScV0WjX/0tqBrC5D21ALg0H0U/KZ/ts=
github.com/kevinburke/go-bindata v3.22.0+incompatible/go.mod h1:/pEEZ72flUW2p0yi30bslSp9YqD9pysLxunQDdb2CPM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -828,6 +830,7 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

View File

@ -0,0 +1,132 @@
// TODO(whb): These tests should get ported to the flux repo and removed here
// when they are included with a flux release that InfluxDB uses to remove the
// redundancy.
package influxdb_test
import "csv"
import "testing"
import "testing/expect"
option now = () => 2030-01-01T00:00:00Z
input = "
#group,false,false,false,false,true,true,true,true,true,true,true
#datatype,string,long,dateTime:RFC3339,long,string,string,string,string,string,string,string
#default,_result,,,,,,,,,,
,result,table,_time,_value,_field,_measurement,device,fstype,host,mode,path
,,0,2020-10-21T20:48:30Z,4881964326,inodes_free,disk,disk1s5,apfs,euterpe.local,ro,/
,,0,2020-10-21T20:48:40Z,4881964326,inodes_free,disk,disk1s5,apfs,euterpe.local,ro,/
,,0,2020-10-21T20:48:50Z,4881964326,inodes_free,disk,disk1s5,apfs,euterpe.local,ro,/
,,1,2020-10-21T20:48:30Z,4294963701,inodes_free,disk,disk2s1,hfs,euterpe.local,ro,/Volumes/IntelliJ IDEA CE
,,1,2020-10-21T20:48:40Z,4294963701,inodes_free,disk,disk2s1,hfs,euterpe.local,ro,/Volumes/IntelliJ IDEA CE
,,1,2020-10-21T20:48:50Z,4294963701,inodes_free,disk,disk2s1,hfs,euterpe.local,ro,/Volumes/IntelliJ IDEA CE
,,2,2020-10-21T20:48:30Z,488514,inodes_used,disk,disk1s5,apfs,euterpe.local,ro,/
,,2,2020-10-21T20:48:40Z,488514,inodes_used,disk,disk1s5,apfs,euterpe.local,ro,/
,,2,2020-10-21T20:48:50Z,488514,inodes_used,disk,disk1s5,apfs,euterpe.local,ro,/
,,3,2020-10-21T20:48:30Z,3578,inodes_used,disk,disk2s1,hfs,euterpe.local,ro,/Volumes/IntelliJ IDEA CE
,,3,2020-10-21T20:48:40Z,3578,inodes_used,disk,disk2s1,hfs,euterpe.local,ro,/Volumes/IntelliJ IDEA CE
,,3,2020-10-21T20:48:50Z,3578,inodes_used,disk,disk2s1,hfs,euterpe.local,ro,/Volumes/IntelliJ IDEA CE
#group,false,false,false,false,true,true,true,true,true
#datatype,string,long,dateTime:RFC3339,double,string,string,string,string,string
#default,_result,,,,,,,,
,result,table,_time,_value,_field,_measurement,cpu,host,region
,,4,2020-10-21T20:48:30Z,69.30000000167638,usage_idle,cpu,cpu0,euterpe.local,south
,,4,2020-10-21T20:48:40Z,67.36736736724372,usage_idle,cpu,cpu0,euterpe.local,south
,,4,2020-10-21T20:48:50Z,69.23076923005354,usage_idle,cpu,cpu0,euterpe.local,south
,,5,2020-10-21T20:48:30Z,96.10000000102445,usage_idle,cpu,cpu1,euterpe.local,south
,,5,2020-10-21T20:48:40Z,95.70000000055181,usage_idle,cpu,cpu1,euterpe.local,south
,,5,2020-10-21T20:48:50Z,95.89999999860534,usage_idle,cpu,cpu1,euterpe.local,south
#group,false,false,false,false,true,true,true,true,true
#datatype,string,long,dateTime:RFC3339,double,string,string,string,string,string
#default,_result,,,,,,,,
,result,table,_time,_value,_field,_measurement,cpu,host,region
,,6,2020-10-21T20:48:30Z,69.30000000167638,usage_idle,cpu,cpu0,mnemosyne.local,east
,,6,2020-10-21T20:48:40Z,67.36736736724372,usage_idle,cpu,cpu0,mnemosyne.local,east
,,6,2020-10-21T20:48:50Z,69.23076923005354,usage_idle,cpu,cpu0,mnemosyne.local,east
,,7,2020-10-21T20:48:30Z,96.10000000102445,usage_idle,cpu,cpu1,mnemosyne.local,east
,,7,2020-10-21T20:48:40Z,95.70000000055181,usage_idle,cpu,cpu1,mnemosyne.local,east
,,7,2020-10-21T20:48:50Z,95.89999999860534,usage_idle,cpu,cpu1,mnemosyne.local,east
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,string,string,dateTime:RFC3339,double,string,string,string
#default,_result,,,,,,,,
,result,table,_field,_measurement,_time,_value,cpu,host,region
,,8,usage_user,cpu,2020-10-21T20:48:30Z,19.30000000007567,cpu0,euterpe.local,north
,,8,usage_user,cpu,2020-10-21T20:48:40Z,20.020020020038682,cpu0,euterpe.local,north
,,8,usage_user,cpu,2020-10-21T20:48:50Z,18.581418581407107,cpu0,euterpe.local,north
,,9,usage_user,cpu,2020-10-21T20:48:30Z,2.3000000000138243,cpu1,euterpe.local,north
,,9,usage_user,cpu,2020-10-21T20:48:40Z,2.4000000000536965,cpu1,euterpe.local,north
,,9,usage_user,cpu,2020-10-21T20:48:50Z,2.0999999999423746,cpu1,euterpe.local,north
"
testcase tag_values_measurement_or_predicate {
got = testing.loadStorage(csv: input)
|> range(start: -100y)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["_measurement"] == "someOtherThing" or r["host"] == "euterpe.local")
|> keep(columns: ["region"])
|> group()
|> distinct(column: "region")
|> limit(n: 200)
|> sort()
want = csv.from(csv: "#datatype,string,long,string
#group,false,false,false
#default,0,,
,result,table,_value
,,0,north
,,0,south
")
expect.planner(rules: ["PushDownReadTagValuesRule": 1])
testing.diff(got, want)
}
testcase tag_values_measurement_or_negation {
got = testing.loadStorage(csv: input)
|> range(start: -100y)
|> filter(fn: (r) => r["_measurement"] != "cpu")
|> filter(fn: (r) => r["_measurement"] == "someOtherThing" or r["fstype"] != "apfs")
|> keep(columns: ["fstype"])
|> group()
|> distinct(column: "fstype")
|> limit(n: 200)
|> sort()
want = csv.from(csv: "#datatype,string,long,string
#group,false,false,false
#default,0,,
,result,table,_value
,,0,hfs
")
expect.planner(rules: ["PushDownReadTagValuesRule": 1])
testing.diff(got, want)
}
testcase tag_values_measurement_or_regex {
got = testing.loadStorage(csv: input)
|> range(start: -100y)
|> filter(fn: (r) => r["_measurement"] =~ /cp.*/)
|> filter(fn: (r) => r["_measurement"] == "someOtherThing" or r["host"] !~ /mnemo.*/)
|> keep(columns: ["region"])
|> group()
|> distinct(column: "region")
|> limit(n: 200)
|> sort()
want = csv.from(csv: "#datatype,string,long,string
#group,false,false,false
#default,0,,
,result,table,_value
,,0,north
,,0,south
")
expect.planner(rules: ["PushDownReadTagValuesRule": 1])
testing.diff(got, want)
}

View File

@ -1458,7 +1458,7 @@ func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.E
case *influxql.ParenExpr:
return is.measurementNamesByExpr(auth, e.Expr)
default:
return nil, fmt.Errorf("%#v", expr)
return nil, fmt.Errorf("invalid measurement expression %#v", expr)
}
}
@ -2819,43 +2819,40 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt
return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
}
// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
//
// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the
// tagValuesByKeyAndExpr returns sets of values for each key, indexable by the
// position of the tag key in the keys argument.
//
// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
// lexicographic order.
func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
return is.tagValuesByKeyAndExpr(auth, name, keys, expr)
}
// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See
// TagValuesByKeyAndExpr for more details.
//
// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
// series file.
func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
database := is.Database()
valueExpr := influxql.CloneExpr(expr)
valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr {
valueExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "value" {
return nil
if ok && tag.Val == "value" {
return true, nil
}
}
}
return e
}), nil)
return false, nil
})
if err != nil {
return nil, err
}
if remainingExpr == nil {
remainingExpr = &influxql.BooleanLiteral{Val: true}
}
itr, err := is.seriesByExprIterator(name, expr)
itr, err := is.seriesByExprIterator(name, remainingExpr)
if err != nil {
return nil, err
} else if itr == nil {
@ -2888,6 +2885,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
break
}
if e.Expr != nil {
// We don't yet have code that correctly processes expressions that
// seriesByExprIterator doesn't handle
lit, ok := e.Expr.(*influxql.BooleanLiteral)
if !ok {
return nil, fmt.Errorf("expression too complex for metaquery: %v", e.Expr)
}
if !lit.Val {
continue
}
}
buf := is.SeriesFile.SeriesKey(e.SeriesID)
if len(buf) == 0 {
continue

View File

@ -785,26 +785,6 @@ func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
return engine.MeasurementNamesByRegex(re)
}
// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
engine, err := s.Engine()
if err != nil {
return nil, err
}
return engine.MeasurementTagKeysByExpr(name, expr)
}
// MeasurementTagKeyValuesByExpr returns all the tag keys values for the
// provided expression.
func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
index, err := s.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted)
}
// MeasurementNamesByPredicate returns fields for a measurement filtered by an expression.
func (s *Shard) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error) {
index, err := s.Index()

View File

@ -13,7 +13,6 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -1524,35 +1523,29 @@ func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []u
return nil, nil
}
measurementExpr := influxql.CloneExpr(cond)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
// take out the _name = 'mymeasurement' clause from 'FROM' clause
measurementExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(cond), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
if ok && tag.Val == "_name" {
return true, nil
}
}
}
return e
}), nil)
return false, nil
})
if err != nil {
return nil, err
}
filterExpr := influxql.CloneExpr(cond)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || influxql.IsSystemName(tag.Val) {
return nil
}
}
}
return e
}), nil)
// take out the _tagKey = 'mykey' clause from 'WITH KEY' clause
tagKeyExpr, filterExpr, err := influxql.PartitionExpr(remainingExpr, isTagKeyClause)
if err != nil {
return nil, err
}
// Get all the shards we're interested in.
is := IndexSet{Indexes: make([]Index, 0, len(shardIDs))}
@ -1599,7 +1592,7 @@ func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []u
}
// Build keyset over all indexes for measurement.
tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil)
tagKeySet, err := is.MeasurementTagKeysByExpr(name, tagKeyExpr)
if err != nil {
return nil, err
} else if len(tagKeySet) == 0 {
@ -1695,42 +1688,66 @@ func (a tagValuesSlice) Len() int { return len(a) }
func (a tagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
func isTagKeyClause(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if ok && tag.Val == "_tagKey" {
return true, nil
}
case influxql.OR, influxql.AND:
ok1, err := isTagKeyClause(e.LHS)
if err != nil {
return false, err
}
ok2, err := isTagKeyClause(e.RHS)
if err != nil {
return false, err
}
return ok1 && ok2, nil
}
case *influxql.ParenExpr:
return isTagKeyClause(e.Expr)
}
return false, nil
}
// TagValues returns the tag keys and values for the provided shards, where the
// tag values satisfy the provided condition.
func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
if len(shardIDs) == 0 {
return nil, nil
}
if cond == nil {
return nil, errors.New("a condition is required")
}
measurementExpr := influxql.CloneExpr(cond)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
// take out the _name = 'mymeasurement' clause from 'FROM' clause
measurementExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(cond), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
if ok && tag.Val == "_name" {
return true, nil
}
}
}
return e
}), nil)
return false, nil
})
if err != nil {
return nil, err
}
filterExpr := influxql.CloneExpr(cond)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || influxql.IsSystemName(tag.Val) {
return nil
}
}
}
return e
}), nil)
// take out the _tagKey = 'mykey' clause from 'WITH KEY' / 'WITH KEY IN' clause
tagKeyExpr, filterExpr, err := influxql.PartitionExpr(remainingExpr, isTagKeyClause)
if err != nil {
return nil, err
}
// Build index set to work on.
is := IndexSet{Indexes: make([]Index, 0, len(shardIDs))}
@ -1760,8 +1777,6 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
}
s.mu.RUnlock()
// Stores each list of TagValues for each measurement.
var allResults []tagValues
var maxMeasurements int // Hint as to lower bound on number of measurements.
// names will be sorted by MeasurementNamesByExpr.
// Authorisation can be done later on, when series may have been filtered
@ -1775,9 +1790,8 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
maxMeasurements = len(names)
}
if allResults == nil {
allResults = make([]tagValues, 0, len(is.Indexes)*len(names)) // Assuming all series in all shards.
}
// Stores each list of TagValues for each measurement.
allResults := make([]tagValues, 0, len(names))
// Iterate over each matching measurement in the shard. For each
// measurement we'll get the matching tag keys (e.g., when a WITH KEYS)
@ -1793,7 +1807,7 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
}
// Determine a list of keys from condition.
keySet, err := is.MeasurementTagKeysByExpr(name, cond)
keySet, err := is.MeasurementTagKeysByExpr(name, tagKeyExpr)
if err != nil {
return nil, err
}
@ -1812,7 +1826,7 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
for k := range keySet {
result.keys = append(result.keys, k)
}
sort.Sort(sort.StringSlice(result.keys))
sort.Strings(result.keys)
// get all the tag values for each key in the keyset.
// Each slice in the results contains the sorted values associated
@ -1841,19 +1855,11 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
}
}
// Not sure this is necessary, should be pre-sorted
sort.Sort(tagValuesSlice(allResults))
result := make([]TagValues, 0, maxMeasurements)
// We need to sort all results by measurement name.
if len(is.Indexes) > 1 {
sort.Sort(tagValuesSlice(allResults))
}
// The next stage is to merge the tagValue results for each shard's measurements.
var i, j int
// Used as a temporary buffer in mergeTagValues. There can be at most len(shards)
// instances of tagValues for a given measurement.
idxBuf := make([][2]int, 0, len(is.Indexes))
for i < len(allResults) {
for _, r := range allResults {
// check for timeouts
select {
case <-ctx.Done():
@ -1861,20 +1867,7 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
default:
}
// Gather all occurrences of the same measurement for merging.
for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) {
j++
}
// An invariant is that there can't be more than n instances of tag
// key value pairs for a given measurement, where n is the number of
// shards.
if got, exp := j-i+1, len(is.Indexes); got > exp {
return nil, fmt.Errorf("unexpected results returned engine. Got %d measurement sets for %d shards", got, exp)
}
nextResult := mergeTagValues(idxBuf, allResults[i:j+1]...)
i = j + 1
nextResult := makeTagValues(r)
if len(nextResult.Values) > 0 {
result = append(result, nextResult)
}
@ -1882,109 +1875,15 @@ func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs [
return result, nil
}
// mergeTagValues merges multiple sorted sets of temporary tagValues using a
// direct k-way merge whilst also removing duplicated entries. The result is a
// single TagValue type.
//
// TODO(edd): a Tournament based merge (see: Knuth's TAOCP 5.4.1) might be more
// appropriate at some point.
//
func mergeTagValues(valueIdxs [][2]int, tvs ...tagValues) TagValues {
func makeTagValues(tv tagValues) TagValues {
var result TagValues
if len(tvs) == 0 {
return TagValues{}
} else if len(tvs) == 1 {
result.Measurement = string(tvs[0].name)
// TODO(edd): will be too small likely. Find a hint?
result.Values = make([]KeyValue, 0, len(tvs[0].values))
result.Measurement = string(tv.name)
// TODO(edd): will be too small likely. Find a hint?
result.Values = make([]KeyValue, 0, len(tv.values))
for ki, key := range tvs[0].keys {
for _, value := range tvs[0].values[ki] {
result.Values = append(result.Values, KeyValue{Key: key, Value: value})
}
}
return result
}
result.Measurement = string(tvs[0].name)
var maxSize int
for _, tv := range tvs {
if len(tv.values) > maxSize {
maxSize = len(tv.values)
}
}
result.Values = make([]KeyValue, 0, maxSize) // This will likely be too small but it's a start.
// Resize and reset to the number of TagValues we're merging.
valueIdxs = valueIdxs[:len(tvs)]
for i := 0; i < len(valueIdxs); i++ {
valueIdxs[i][0], valueIdxs[i][1] = 0, 0
}
var (
j int
keyCmp, valCmp int
)
for {
// Which of the provided TagValue sets currently holds the smallest element.
// j is the candidate we're going to next pick for the result set.
j = -1
// Find the smallest element
for i := 0; i < len(tvs); i++ {
if valueIdxs[i][0] >= len(tvs[i].keys) {
continue // We have completely drained all tag keys and values for this shard.
} else if len(tvs[i].values[valueIdxs[i][0]]) == 0 {
// There are no tag values for these keys.
valueIdxs[i][0]++
valueIdxs[i][1] = 0
continue
} else if j == -1 {
// We haven't picked a best TagValues set yet. Pick this one.
j = i
continue
}
// It this tag key is lower than the candidate's tag key
keyCmp = strings.Compare(tvs[i].keys[valueIdxs[i][0]], tvs[j].keys[valueIdxs[j][0]])
if keyCmp == -1 {
j = i
} else if keyCmp == 0 {
valCmp = strings.Compare(tvs[i].values[valueIdxs[i][0]][valueIdxs[i][1]], tvs[j].values[valueIdxs[j][0]][valueIdxs[j][1]])
// Same tag key but this tag value is lower than the candidate.
if valCmp == -1 {
j = i
} else if valCmp == 0 {
// Duplicate tag key/value pair.... Remove and move onto
// the next value for shard i.
valueIdxs[i][1]++
if valueIdxs[i][1] >= len(tvs[i].values[valueIdxs[i][0]]) {
// Drained all these tag values, move onto next key.
valueIdxs[i][0]++
valueIdxs[i][1] = 0
}
}
}
}
// We could have drained all of the TagValue sets and be done...
if j == -1 {
break
}
// Append the smallest KeyValue
result.Values = append(result.Values, KeyValue{
Key: string(tvs[j].keys[valueIdxs[j][0]]),
Value: tvs[j].values[valueIdxs[j][0]][valueIdxs[j][1]],
})
// Increment the indexes for the chosen TagValue.
valueIdxs[j][1]++
if valueIdxs[j][1] >= len(tvs[j].values[valueIdxs[j][0]]) {
// Drained all these tag values, move onto next key.
valueIdxs[j][0]++
valueIdxs[j][1] = 0
for ki, key := range tv.keys {
for _, value := range tv.values[ki] {
result.Values = append(result.Values, KeyValue{Key: key, Value: value})
}
}
return result

View File

@ -1,167 +0,0 @@
package tsdb
import (
"fmt"
"reflect"
"sort"
"testing"
)
func TestStore_mergeTagValues(t *testing.T) {
examples := []struct {
in []tagValues
out TagValues
}{
{},
{in: make([]tagValues, 4), out: TagValues{Values: []KeyValue{}}},
{
in: []tagValues{createtagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}})},
out: createTagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}}),
},
out: createTagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{"host": {"server-a", "server-d", "server-e"}}),
},
out: createTagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c", "server-d", "server-e"}}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"host": {"server-a"}}),
createtagValues("m0", map[string][]string{}),
createtagValues("m0", map[string][]string{"host": {"server-a"}}),
},
out: createTagValues("m0", map[string][]string{"host": {"server-a"}}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"host": {"server-q", "server-z"}}),
createtagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{"host": {"server-a", "server-d", "server-e"}}),
createtagValues("m0", map[string][]string{"host": {"server-e", "server-q", "server-z"}}),
createtagValues("m0", map[string][]string{"host": {"server-a"}}),
},
out: createTagValues("m0", map[string][]string{"host": {"server-a", "server-b", "server-c", "server-d", "server-e", "server-q", "server-z"}}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"a": {"0", "1"}, "host1": {"server-q", "server-z"}}),
createtagValues("m0", map[string][]string{"a": {"0", "2"}, "host2": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{"a": {"0", "3"}, "host3": {"server-a", "server-d", "server-e"}}),
createtagValues("m0", map[string][]string{"a": {"0", "4"}, "host4": {"server-e", "server-q", "server-z"}}),
createtagValues("m0", map[string][]string{"a": {"0", "5"}, "host5": {"server-a"}}),
},
out: createTagValues("m0", map[string][]string{
"a": {"0", "1", "2", "3", "4", "5"},
"host1": {"server-q", "server-z"},
"host2": {"server-a", "server-b", "server-c"},
"host3": {"server-a", "server-d", "server-e"},
"host4": {"server-e", "server-q", "server-z"},
"host5": {"server-a"},
}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"region": {"east-1", "west-1"}, "host": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{"region": {"north-1", "west-1"}, "host": {"server-a", "server-d", "server-e"}}),
},
out: createTagValues("m0", map[string][]string{
"host": {"server-a", "server-b", "server-c", "server-d", "server-e"},
"region": {"east-1", "north-1", "west-1"},
}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"region": {"east-1", "west-1"}, "host": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{"city": {"Baltimore", "Las Vegas"}}),
},
out: createTagValues("m0", map[string][]string{
"city": {"Baltimore", "Las Vegas"},
"host": {"server-a", "server-b", "server-c"},
"region": {"east-1", "west-1"},
}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"city": {"Baltimore", "Las Vegas"}}),
createtagValues("m0", map[string][]string{"region": {"east-1", "west-1"}, "host": {"server-a", "server-b", "server-c"}}),
},
out: createTagValues("m0", map[string][]string{
"city": {"Baltimore", "Las Vegas"},
"host": {"server-a", "server-b", "server-c"},
"region": {"east-1", "west-1"},
}),
},
{
in: []tagValues{
createtagValues("m0", map[string][]string{"region": {"east-1", "west-1"}, "host": {"server-a", "server-b", "server-c"}}),
createtagValues("m0", map[string][]string{}),
},
out: createTagValues("m0", map[string][]string{
"host": {"server-a", "server-b", "server-c"},
"region": {"east-1", "west-1"},
}),
},
}
buf := make([][2]int, 10)
for i, example := range examples {
t.Run(fmt.Sprintf("example_%d", i+1), func(t *testing.T) {
if got, exp := mergeTagValues(buf, example.in...), example.out; !reflect.DeepEqual(got, exp) {
t.Fatalf("\ngot\n %#v\n\n expected\n %#v", got, exp)
}
})
}
}
// Helper to create some tagValues.
func createtagValues(mname string, kvs map[string][]string) tagValues {
out := tagValues{
name: []byte(mname),
keys: make([]string, 0, len(kvs)),
values: make([][]string, len(kvs)),
}
for k := range kvs {
out.keys = append(out.keys, k)
}
sort.Strings(out.keys)
for i, k := range out.keys {
values := kvs[k]
sort.Strings(values)
out.values[i] = values
}
return out
}
// Helper to create some TagValues
func createTagValues(mname string, kvs map[string][]string) TagValues {
var sz int
for _, v := range kvs {
sz += len(v)
}
out := TagValues{
Measurement: mname,
Values: make([]KeyValue, 0, sz),
}
for tk, tvs := range kvs {
for _, tv := range tvs {
out.Values = append(out.Values, KeyValue{Key: tk, Value: tv})
}
// We have to sort the KeyValues since that's how they're provided from
// the Store.
sort.Sort(KeyValues(out.Values))
}
return out
}