fix(storage): Ensure _field tag key has special handling

Fixes #19120
pull/19446/head
Stuart Carnie 2020-08-21 13:06:00 -07:00
parent c01a62d5d4
commit afd5120221
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
1 changed files with 71 additions and 0 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/storage/reads"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/v1/services/meta"
@ -261,6 +262,9 @@ func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest)
MeasurementsSource: req.TagsSource,
Predicate: req.Predicate,
})
case "_field":
return s.measurementFields(ctx, req)
}
}
@ -409,3 +413,70 @@ func (s *Store) GetSource(orgID, bucketID uint64) proto.Message {
OrganizationID: orgID,
}
}
func (s *Store) measurementFields(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) {
source, err := getReadSource(*req.TagsSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End)
if err != nil {
return nil, err
}
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 {
return cursors.EmptyStringIterator, nil
}
var expr influxql.Expr
if root := req.Predicate.GetRoot(); root != nil {
var err error
expr, err = reads.NodeToExpr(root, measurementRemap)
if err != nil {
return nil, err
}
if found := reads.HasFieldValueKey(expr); found {
return nil, errors.New("field values unsupported")
}
expr = influxql.Reduce(influxql.CloneExpr(expr), nil)
if reads.IsTrueBooleanLiteral(expr) {
expr = nil
}
}
sg := s.TSDBStore.ShardGroup(shardIDs)
ms := &influxql.Measurement{
Database: database,
RetentionPolicy: rp,
SystemIterator: "_fieldKeys",
}
opts := query.IteratorOptions{
OrgID: influxdb.ID(source.OrganizationID),
Condition: expr,
Authorizer: query.OpenAuthorizer,
}
iter, err := sg.CreateIterator(ctx, ms, opts)
if err != nil {
return nil, err
}
defer func() { _ = iter.Close() }()
var fieldNames []string
fitr := iter.(query.FloatIterator)
for p, _ := fitr.Next(); p != nil; p, _ = fitr.Next() {
if len(p.Aux) >= 1 {
fieldNames = append(fieldNames, p.Aux[0].(string))
}
}
sort.Strings(fieldNames)
fieldNames = slices.MergeSortedStrings(fieldNames)
return cursors.NewStringSliceIterator(fieldNames), nil
}