Fix MeasurementNamesByExpr in tsi1

pull/9127/head
Edd Robinson 2017-11-16 17:22:00 +00:00
parent 3967e78885
commit 25f0fedd6f
1 changed files with 61 additions and 24 deletions

View File

@ -549,7 +549,7 @@ func (fs *FileSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E
// Iterate over all measurements if no condition exists.
var names [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
if fs.measurementAuthorized(auth, e.Name()) {
if fs.measurementAuthorizedSeries(auth, e.Name()) {
names = append(names, e.Name())
}
}
@ -643,7 +643,7 @@ func (fs *FileSet) measurementNamesByNameFilter(auth query.Authorizer, op influx
matched = !regex.Match(e.Name())
}
if matched && fs.measurementAuthorized(auth, e.Name()) {
if matched && fs.measurementAuthorizedSeries(auth, e.Name()) {
names = append(names, e.Name())
}
}
@ -659,37 +659,75 @@ func (fs *FileSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
return nil
}
for me := mitr.Next(); me != nil; me = mitr.Next() {
// If the operator is non-regex, only check the specified value.
var tagMatch bool
// valEqual determins if the provided []byte is equal to the tag value
// to be filtered on.
valEqual := func(b []byte) bool {
if op == influxql.EQ || op == influxql.NEQ {
if fs.HasTagValue(me.Name(), []byte(key), []byte(val)) {
tagMatch = true
}
} else {
// Else, the operator is a regex and we have to check all tag
// values against the regular expression.
vitr := fs.TagValueIterator(me.Name(), []byte(key))
if vitr != nil {
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
if regex.Match(ve.Value()) {
tagMatch = true
return bytes.Equal([]byte(val), b)
}
return regex.Match(b)
}
var tagMatch bool
var authorized bool
for me := mitr.Next(); me != nil; me = mitr.Next() {
// If the measurement doesn't have the tag key, then it won't be considered.
if !fs.HasTagKey(me.Name(), []byte(key)) {
continue
}
tagMatch = false
authorized = true
if auth != nil {
authorized = false // Authorization must be explicitly granted.
}
vitr := fs.TagValueIterator(me.Name(), []byte(key))
if vitr != nil {
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
// If a match was found for the EQ or EQREGEX operator then
// the measurement should be included (if it's authorized).
if valEqual(ve.Value()) {
tagMatch = true
if auth == nil {
break
}
// Is there a series with this matching tag value that is
// authorized to be read?
sitr := fs.TagValueSeriesIterator(me.Name(), []byte(key), ve.Value())
if sitr == nil {
continue
}
for se := sitr.Next(); se != nil; se = sitr.Next() {
if auth.AuthorizeSeriesRead(fs.database, me.Name(), se.Tags()) {
authorized = true
break
}
}
if tagMatch && authorized {
// The measurement can definitely be included or rejected.
break
}
}
}
}
//
// XNOR gate
//
// For negation operators, to determine if the measurement is authorized,
// an authorized series belonging to the measurement must located.
// Then, the measurement can be added iff !tagMatch && authorized.
if op == influxql.NEQ || op == influxql.NEQREGEX && !tagMatch {
authorized = fs.measurementAuthorizedSeries(auth, me.Name())
}
// tags match | operation is EQ | measurement matches
// --------------------------------------------------
// True | True | True
// True | False | False
// False | True | False
// False | False | True
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && fs.measurementAuthorized(auth, me.Name()) {
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized {
names = append(names, me.Name())
continue
}
@ -699,10 +737,9 @@ func (fs *FileSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
return names
}
// measurementAuthorized determines if the measurement is authorized to be read.
// A measurment is authorised to be read if at least one of the measurement's
// series is authorised to be read.
func (fs *FileSet) measurementAuthorized(auth query.Authorizer, name []byte) bool {
// measurementAuthorizedSeries determines if the measurement contains a series
// that is authorized to be read.
func (fs *FileSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte) bool {
if auth == nil {
return true
}