fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934)

This fixes multi measurement queries that go through the storage service
to correctly pick up all series that apply with the filter. Previously,
negative queries such as `!=`, `!~`, and predicates attempting to match
empty tags did not work correctly with the storage service when multiple
measurements or `OR` conditions were included.

This was because these predicates would be categorized as "multiple
measurements" and then it would attempt to use the field keys iterator
to find the fields for each measurement. The meta queries for these did
not correctly account for negative equality operators or empty tags when
finding appropriate measurements and those could not be changed because
it would cause a breaking change to influxql too.

This modifies the storage service to use new methods that correctly
account for the above situations rather than the field keys iterator.

Some queries that appeared to be single measurement queries also get
considered as multiple measurement queries. Any query with an `OR`
condition will be considered a multiple measurement query.

This bug did not apply to single measurement queries where one
measurement was selected and all of the logical operators were `AND`
values. This is because it used a different code path that correctly
handled these situations.

Backport of #19566.

(cherry picked from commit ceead88bd5)

Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
pull/20951/head
Sam Arnold 2021-03-12 16:34:14 -05:00 committed by GitHub
parent 3eb4fdaf33
commit 04f4817aae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 468 additions and 79 deletions

1
go.mod
View File

@ -25,7 +25,6 @@ require (
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada
github.com/mattn/go-isatty v0.0.12
github.com/opentracing/opentracing-go v1.1.0
github.com/paulbellamy/ratecounter v0.2.0
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1

View File

@ -2,8 +2,6 @@ package storage
import (
"context"
"errors"
"sort"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
@ -112,22 +110,21 @@ func newIndexSeriesCursor(ctx context.Context, predicate *datatypes.Predicate, s
return p, nil
}
var (
itr query.Iterator
fi query.FloatIterator
)
if itr, err = sg.CreateIterator(ctx, &influxql.Measurement{SystemIterator: "_fieldKeys"}, opt); itr != nil && err == nil {
if fi, err = toFloatIterator(itr); err != nil {
goto CLEANUP
}
p.fields = extractFields(fi)
fi.Close()
if len(p.fields) == 0 {
goto CLEANUP
}
return p, nil
var mfkeys map[string][]string
mfkeys, err = sg.FieldKeysByPredicate(opt.Condition)
if err != nil {
goto CLEANUP
}
p.fields = make(map[string][]field, len(mfkeys))
for name, fkeys := range mfkeys {
fields := make([]field, 0, len(fkeys))
for _, key := range fkeys {
fields = append(fields, field{n: key, nb: []byte(key)})
}
p.fields[name] = fields
}
return p, nil
}
CLEANUP:
@ -230,57 +227,3 @@ type field struct {
n string
nb []byte
}
func extractFields(itr query.FloatIterator) measurementFields {
mf := make(measurementFields)
for {
p, err := itr.Next()
if err != nil {
return nil
} else if p == nil {
break
}
// Aux is populated by `fieldKeysIterator#Next`
fields := append(mf[p.Name], field{
n: p.Aux[0].(string),
})
mf[p.Name] = fields
}
if len(mf) == 0 {
return nil
}
for k, fields := range mf {
sort.Slice(fields, func(i, j int) bool {
return fields[i].n < fields[j].n
})
// deduplicate
i := 1
fields[0].nb = []byte(fields[0].n)
for j := 1; j < len(fields); j++ {
if fields[j].n != fields[j-1].n {
fields[i] = fields[j]
fields[i].nb = []byte(fields[i].n)
i++
}
}
mf[k] = fields[:i]
}
return mf
}
func toFloatIterator(iter query.Iterator) (query.FloatIterator, error) {
sitr, ok := iter.(query.FloatIterator)
if !ok {
return nil, errors.New("expected FloatIterator")
}
return sitr, nil
}

View File

@ -1353,7 +1353,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E
// Determine if there exists at least one authorised series for the
// measurement name.
if is.measurementAuthorizedSeries(auth, e) {
if is.measurementAuthorizedSeries(auth, e, nil) {
names = append(names, e)
}
}
@ -1457,7 +1457,7 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influx
matched = !regex.Match(e)
}
if matched && is.measurementAuthorizedSeries(auth, e) {
if matched && is.measurementAuthorizedSeries(auth, e, nil) {
names = append(names, e)
}
}
@ -1465,6 +1465,116 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influx
return names, nil
}
// MeasurementNamesByPredicate returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
// This behaves differently from MeasurementNamesByExpr because it will
// return measurements using flux predicates.
func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByPredicate(auth, expr)
if err != nil {
return nil, err
}
return slices.CopyChunkedByteSlices(names, 1000), nil
}
itr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
// Iterate over all measurements if no condition exists.
var names [][]byte
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e == nil {
break
}
// Determine if there exists at least one authorised series for the
// measurement name.
if is.measurementAuthorizedSeries(auth, e, nil) {
names = append(names, e)
}
}
return slices.CopyChunkedByteSlices(names, 1000), nil
}
func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
if expr == nil {
return nil, nil
}
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
}
// Retrieve value or regex expression from RHS.
var value string
var regex *regexp.Regexp
if influxql.IsRegexOp(e.Op) {
re, ok := e.RHS.(*influxql.RegexLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
}
regex = re.Val
} else {
s, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
}
value = s.Val
}
// Match on name, if specified.
if tag.Val == "_name" {
return is.measurementNamesByNameFilter(auth, e.Op, value, regex)
} else if influxql.IsSystemName(tag.Val) {
return nil, nil
}
return is.measurementNamesByTagPredicate(auth, e.Op, tag.Val, value, regex)
case influxql.OR, influxql.AND:
lhs, err := is.measurementNamesByPredicate(auth, e.LHS)
if err != nil {
return nil, err
}
rhs, err := is.measurementNamesByPredicate(auth, e.RHS)
if err != nil {
return nil, err
}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
}
return bytesutil.Intersect(lhs, rhs), nil
default:
return nil, fmt.Errorf("invalid tag comparison operator")
}
case *influxql.ParenExpr:
return is.measurementNamesByPredicate(auth, e.Expr)
default:
return nil, fmt.Errorf("%#v", expr)
}
}
func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
var names [][]byte
@ -1573,7 +1683,7 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
// an authorized series belonging to the measurement must be located.
// Then, the measurement can be added iff !tagMatch && authorized.
if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch {
authorized = is.measurementAuthorizedSeries(auth, me)
authorized = is.measurementAuthorizedSeries(auth, me, nil)
}
// tags match | operation is EQ | measurement matches
@ -1592,13 +1702,80 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
return names, nil
}
func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
var names [][]byte
mitr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error)
switch op {
case influxql.EQ:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
return is.measurementHasTagValue(auth, me, []byte(key), []byte(val))
}
case influxql.NEQ:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
// If there is an authorized series in this measurement and that series
// does not contain the tag key/value.
ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
return tags.GetString(key) == val
})
return ok, nil
}
case influxql.EQREGEX:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
return is.measurementHasTagValueRegex(auth, me, []byte(key), regex)
}
case influxql.NEQREGEX:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
// If there is an authorized series in this measurement and that series
// does not contain the tag key/value.
ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
return regex.MatchString(tags.GetString(key))
})
return ok, nil
}
default:
return nil, fmt.Errorf("unsupported operand: %s", op)
}
for {
me, err := mitr.Next()
if err != nil {
return nil, err
} else if me == nil {
break
}
ok, err := checkMeasurement(auth, me)
if err != nil {
return nil, err
} else if ok {
names = append(names, me)
}
}
bytesutil.Sort(names)
return names, nil
}
// measurementAuthorizedSeries determines if the measurement contains a series
// that is authorized to be read.
func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte) bool {
if query.AuthorizerIsOpen(auth) {
func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte, exclude func(tags models.Tags) bool) bool {
if query.AuthorizerIsOpen(auth) && exclude == nil {
return true
}
if auth == nil {
auth = query.OpenAuthorizer
}
sitr, err := is.measurementSeriesIDIterator(name)
if err != nil || sitr == nil {
return false
@ -1618,11 +1795,151 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt
name, tags := is.SeriesFile.Series(series.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
if exclude != nil && exclude(tags) {
continue
}
return true
}
}
}
func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value []byte) (bool, error) {
if len(value) == 0 {
return is.measurementHasEmptyTagValue(auth, me, key)
}
hasTagValue, err := is.HasTagValue(me, key, value)
if err != nil || !hasTagValue {
return false, err
}
// If the authorizer is open, return true.
if query.AuthorizerIsOpen(auth) {
return true, nil
}
// When an authorizer is present, the measurement should be
// included only if one of it's series is authorized.
sitr, err := is.tagValueSeriesIDIterator(me, key, value)
if err != nil || sitr == nil {
return false, err
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
// Locate a series with this matching tag value that's authorized.
for {
se, err := sitr.Next()
if err != nil || se.SeriesID == 0 {
return false, err
}
name, tags := is.SeriesFile.Series(se.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
}
func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []byte) (bool, error) {
// Any series that does not have a tag key
// has an empty tag value for that key.
// Iterate through all of the series to find one
// series that does not have the tag key.
sitr, err := is.measurementSeriesIDIterator(me)
if err != nil || sitr == nil {
return false, err
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
for {
series, err := sitr.Next()
if err != nil || series.SeriesID == 0 {
return false, err
}
name, tags := is.SeriesFile.Series(series.SeriesID)
if len(tags.Get(key)) > 0 {
// The tag key exists in this series. We need
// at least one series that does not have the tag
// keys.
continue
}
// Verify that we can see this series.
if query.AuthorizerIsOpen(auth) {
return true, nil
} else if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
}
func (is IndexSet) measurementHasTagValueRegex(auth query.Authorizer, me, key []byte, value *regexp.Regexp) (bool, error) {
// If the regex matches the empty string, do a special check to see
// if we have an empty tag value.
if matchEmpty := value.MatchString(""); matchEmpty {
if ok, err := is.measurementHasEmptyTagValue(auth, me, key); err != nil {
return false, err
} else if ok {
return true, nil
}
}
// Iterate over the tag values and find one that matches the value.
vitr, err := is.tagValueIterator(me, key)
if err != nil || vitr == nil {
return false, err
}
defer vitr.Close()
for {
ve, err := vitr.Next()
if err != nil || ve == nil {
return false, err
}
if !value.Match(ve) {
// The regex does not match this tag value.
continue
}
// If the authorizer is open, then we have found a suitable tag value.
if query.AuthorizerIsOpen(auth) {
return true, nil
}
// When an authorizer is present, the measurement should only be included
// if one of the series is authorized.
if authorized, err := func() (bool, error) {
sitr, err := is.tagValueSeriesIDIterator(me, key, ve)
if err != nil || sitr == nil {
return false, err
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
// Locate an authorized series.
for {
se, err := sitr.Next()
if err != nil || se.SeriesID == 0 {
return false, err
}
name, tags := is.SeriesFile.Series(se.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
}(); err != nil {
return false, err
} else if authorized {
return true, nil
}
}
}
// HasTagKey returns true if the tag key exists in any index for the provided
// measurement.
func (is IndexSet) HasTagKey(name, key []byte) (bool, error) {

View File

@ -137,6 +137,96 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
}
}
func TestIndexSet_MeasurementNamesByPredicate(t *testing.T) {
// Setup indexes
indexes := map[string]*Index{}
for _, name := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(name)
idx.AddSeries("cpu", map[string]string{"region": "east"})
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"})
idx.AddSeries("disk", map[string]string{"secret": "foo"})
idx.AddSeries("mem", map[string]string{"region": "west"})
idx.AddSeries("gpu", map[string]string{"region": "east"})
idx.AddSeries("pci", map[string]string{"region": "east", "secret": "foo"})
indexes[name] = idx
defer idx.Close()
}
authorizer := &internal.AuthorizerMock{
AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
if tags.GetString("secret") != "" {
t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
return false
}
return true
},
}
type example struct {
name string
expr influxql.Expr
expected [][]byte
}
// These examples should be run without any auth.
examples := []example{
{name: "all", expected: slices.StringsToBytes("cpu", "disk", "gpu", "mem", "pci")},
{name: "EQ", expr: influxql.MustParseExpr(`region = 'west'`), expected: slices.StringsToBytes("cpu", "mem")},
{name: "NEQ", expr: influxql.MustParseExpr(`region != 'west'`), expected: slices.StringsToBytes("cpu", "disk", "gpu", "pci")},
{name: "EQREGEX", expr: influxql.MustParseExpr(`region =~ /.*st/`), expected: slices.StringsToBytes("cpu", "gpu", "mem", "pci")},
{name: "NEQREGEX", expr: influxql.MustParseExpr(`region !~ /.*est/`), expected: slices.StringsToBytes("cpu", "disk", "gpu", "pci")},
// None of the series have this tag so all should be selected.
{name: "EQ empty", expr: influxql.MustParseExpr(`host = ''`), expected: slices.StringsToBytes("cpu", "disk", "gpu", "mem", "pci")},
// Measurements that have this tag at all should be returned.
{name: "NEQ empty", expr: influxql.MustParseExpr(`region != ''`), expected: slices.StringsToBytes("cpu", "gpu", "mem", "pci")},
{name: "EQREGEX empty", expr: influxql.MustParseExpr(`host =~ /.*/`), expected: slices.StringsToBytes("cpu", "disk", "gpu", "mem", "pci")},
{name: "NEQ empty", expr: influxql.MustParseExpr(`region !~ /.*/`), expected: slices.StringsToBytes()},
}
// These examples should be run with the authorizer.
authExamples := []example{
{name: "all", expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "EQ", expr: influxql.MustParseExpr(`region = 'west'`), expected: slices.StringsToBytes("mem")},
{name: "NEQ", expr: influxql.MustParseExpr(`region != 'west'`), expected: slices.StringsToBytes("cpu", "gpu")},
{name: "EQREGEX", expr: influxql.MustParseExpr(`region =~ /.*st/`), expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "NEQREGEX", expr: influxql.MustParseExpr(`region !~ /.*est/`), expected: slices.StringsToBytes("cpu", "gpu")},
{name: "EQ empty", expr: influxql.MustParseExpr(`host = ''`), expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "NEQ empty", expr: influxql.MustParseExpr(`region != ''`), expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "EQREGEX empty", expr: influxql.MustParseExpr(`host =~ /.*/`), expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "NEQ empty", expr: influxql.MustParseExpr(`region !~ /.*/`), expected: slices.StringsToBytes()},
}
for _, idx := range tsdb.RegisteredIndexes() {
t.Run(idx, func(t *testing.T) {
t.Run("no authorization", func(t *testing.T) {
for _, example := range examples {
t.Run(example.name, func(t *testing.T) {
names, err := indexes[idx].IndexSet().MeasurementNamesByPredicate(nil, example.expr)
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, example.expected) {
t.Fatalf("got names: %v, expected %v", slices.BytesToStrings(names), slices.BytesToStrings(example.expected))
}
})
}
})
t.Run("with authorization", func(t *testing.T) {
for _, example := range authExamples {
t.Run(example.name, func(t *testing.T) {
names, err := indexes[idx].IndexSet().MeasurementNamesByPredicate(authorizer, example.expr)
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, example.expected) {
t.Fatalf("got names: %v, expected %v", slices.BytesToStrings(names), slices.BytesToStrings(example.expected))
}
})
}
})
})
}
}
func TestIndexSet_DedupeInmemIndexes(t *testing.T) {
testCases := []struct {
tsiN int // Quantity of TSI indexes

View File

@ -801,9 +801,17 @@ func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
return engine.MeasurementNamesByRegex(re)
}
// MeasurementNamesByPredicate returns fields for a measurement filtered by an expression.
func (s *Shard) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error) {
index, err := s.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return indexSet.MeasurementNamesByPredicate(query.OpenAuthorizer, expr)
}
// MeasurementFields returns fields for a measurement.
// TODO(edd): This method is currently only being called from tests; do we
// really need it?
func (s *Shard) MeasurementFields(name []byte) *MeasurementFields {
engine, err := s.Engine()
if err != nil {
@ -1258,6 +1266,38 @@ func (a Shards) FieldKeysByMeasurement(name []byte) []string {
return slices.MergeSortedStrings(all...)
}
// MeasurementNamesByPredicate returns the measurements that match the given predicate.
func (a Shards) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error) {
if len(a) == 1 {
return a[0].MeasurementNamesByPredicate(expr)
}
all := make([][][]byte, len(a))
for i, shard := range a {
names, err := shard.MeasurementNamesByPredicate(expr)
if err != nil {
return nil, err
}
all[i] = names
}
return slices.MergeSortedBytes(all...), nil
}
// FieldKeysByPredicate returns the field keys for series that match
// the given predicate.
func (a Shards) FieldKeysByPredicate(expr influxql.Expr) (map[string][]string, error) {
names, err := a.MeasurementNamesByPredicate(expr)
if err != nil {
return nil, err
}
all := make(map[string][]string, len(names))
for _, name := range names {
all[string(name)] = a.FieldKeysByMeasurement(name)
}
return all, nil
}
func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})