feat(tsm1): Implementation of MeasurementFields
This commit provides an implementation of the MeasurementFields API per the design previously outlined.pull/17589/head
parent
7de6383adf
commit
cb618efc65
|
|
@ -4,11 +4,12 @@ package cursors
|
|||
type FieldType int
|
||||
|
||||
const (
|
||||
Float FieldType = iota // means the data type is a float
|
||||
Integer // means the data type is an integer
|
||||
Unsigned // means the data type is an unsigned integer
|
||||
Boolean // means the data type is a boolean
|
||||
String // means the data type is a string of text
|
||||
Float FieldType = iota // means the data type is a float
|
||||
Integer // means the data type is an integer
|
||||
Unsigned // means the data type is an unsigned integer
|
||||
Boolean // means the data type is a boolean
|
||||
String // means the data type is a string of text
|
||||
Undefined // means the data type in unknown or undefined
|
||||
)
|
||||
|
||||
type MeasurementField struct {
|
||||
|
|
@ -20,8 +21,8 @@ type MeasurementFields struct {
|
|||
Fields []MeasurementField
|
||||
}
|
||||
|
||||
type MeasurementFieldsCursor interface {
|
||||
// Next advances the MeasurementFieldsCursor to the next value. It returns false
|
||||
type MeasurementFieldsIterator interface {
|
||||
// Next advances the iterator to the next value. It returns false
|
||||
// when there are no more values.
|
||||
Next() bool
|
||||
|
||||
|
|
@ -30,3 +31,75 @@ type MeasurementFieldsCursor interface {
|
|||
|
||||
Stats() CursorStats
|
||||
}
|
||||
|
||||
// EmptyMeasurementFieldsIterator is an implementation of MeasurementFieldsIterator that returns
|
||||
// no values.
|
||||
var EmptyMeasurementFieldsIterator = &measurementFieldsIterator{}
|
||||
|
||||
type measurementFieldsIterator struct{}
|
||||
|
||||
func (m *measurementFieldsIterator) Next() bool { return false }
|
||||
func (m *measurementFieldsIterator) Value() MeasurementFields { return MeasurementFields{} }
|
||||
func (m *measurementFieldsIterator) Stats() CursorStats { return CursorStats{} }
|
||||
|
||||
type MeasurementFieldsSliceIterator struct {
|
||||
f []MeasurementFields
|
||||
v MeasurementFields
|
||||
i int
|
||||
stats CursorStats
|
||||
}
|
||||
|
||||
func NewMeasurementFieldsSliceIteratorWithStats(f []MeasurementFields, stats CursorStats) *MeasurementFieldsSliceIterator {
|
||||
return &MeasurementFieldsSliceIterator{f: f, stats: stats}
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) Next() bool {
|
||||
if s.i < len(s.f) {
|
||||
s.v = s.f[s.i]
|
||||
s.i++
|
||||
return true
|
||||
}
|
||||
s.v = MeasurementFields{}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) Value() MeasurementFields {
|
||||
return s.v
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) Stats() CursorStats {
|
||||
return s.stats
|
||||
}
|
||||
|
||||
func (s *MeasurementFieldsSliceIterator) toSlice() []MeasurementFields {
|
||||
if s.i < len(s.f) {
|
||||
return s.f[s.i:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MeasurementFieldsIteratorFlatMap reads the remainder of i, flattening the results
|
||||
// to a single slice.
|
||||
func MeasurementFieldsIteratorFlatMap(i MeasurementFieldsIterator) []MeasurementField {
|
||||
if i == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var res []MeasurementField
|
||||
if si, ok := i.(*MeasurementFieldsSliceIterator); ok {
|
||||
s := si.toSlice()
|
||||
sz := 0
|
||||
for i := range s {
|
||||
sz += len(s[i].Fields)
|
||||
}
|
||||
res = make([]MeasurementField, 0, sz)
|
||||
for i := range s {
|
||||
res = append(res, s[i].Fields...)
|
||||
}
|
||||
} else {
|
||||
for i.Next() {
|
||||
res = append(res, i.Value().Fields...)
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
|
||||
"github.com/influxdata/influxql"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
|
@ -74,6 +75,8 @@ func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.
|
|||
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
prefixStr := string(prefix)
|
||||
|
|
@ -94,10 +97,17 @@ func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.
|
|||
return nil
|
||||
}
|
||||
|
||||
stats.ScannedValues += entry.values.Len()
|
||||
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if entry.values.Contains(start, end) {
|
||||
sort.Sort(&ts)
|
||||
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
tsmValues[string(name)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
|
|
@ -149,6 +159,248 @@ func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxd
|
|||
return e.tagKeysPredicate(ctx, orgID, bucketID, []byte(measurement), start, end, predicate)
|
||||
}
|
||||
|
||||
func (e *Engine) MeasurementFields(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
|
||||
if predicate == nil {
|
||||
return e.fieldsNoPredicate(ctx, orgID, bucketID, []byte(measurement), start, end)
|
||||
}
|
||||
|
||||
predicate = AddMeasurementToExpr(measurement, predicate)
|
||||
|
||||
return e.fieldsPredicate(ctx, orgID, bucketID, []byte(measurement), start, end, predicate)
|
||||
}
|
||||
|
||||
type fieldTypeTime struct {
|
||||
typ cursors.FieldType
|
||||
max int64
|
||||
}
|
||||
|
||||
func (e *Engine) fieldsPredicate(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, measurement []byte, start int64, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
|
||||
if err := ValidateTagPredicate(predicate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
orgBucket := tsdb.EncodeName(orgID, bucketID)
|
||||
|
||||
keys, err := e.findCandidateKeys(ctx, orgBucket[:], predicate)
|
||||
if err != nil {
|
||||
return cursors.EmptyMeasurementFieldsIterator, err
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return cursors.EmptyMeasurementFieldsIterator, nil
|
||||
}
|
||||
|
||||
var files []TSMFile
|
||||
defer func() {
|
||||
for _, f := range files {
|
||||
f.Unref()
|
||||
}
|
||||
}()
|
||||
var iters []*TimeRangeMaxTimeIterator
|
||||
|
||||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||
orgBucketEsc := models.EscapeMeasurement(orgBucket[:])
|
||||
|
||||
mt := models.Tags{models.NewTag(models.MeasurementTagKeyBytes, measurement)}
|
||||
tsmKeyPrefix := mt.AppendHashKey(orgBucketEsc)
|
||||
|
||||
var canceled bool
|
||||
|
||||
e.FileStore.ForEachFile(func(f TSMFile) bool {
|
||||
// Check the context before accessing each tsm file
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
canceled = true
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(tsmKeyPrefix, tsmKeyPrefix) {
|
||||
f.Ref()
|
||||
files = append(files, f)
|
||||
iters = append(iters, f.TimeRangeMaxTimeIterator(tsmKeyPrefix, start, end))
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
var stats cursors.CursorStats
|
||||
|
||||
if canceled {
|
||||
stats = statsFromTimeRangeMaxTimeIters(stats, iters)
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
tsmValues := make(map[string]fieldTypeTime)
|
||||
|
||||
// reusable buffers
|
||||
var (
|
||||
tags models.Tags
|
||||
keybuf []byte
|
||||
sfkey []byte
|
||||
ts cursors.TimestampArray
|
||||
)
|
||||
|
||||
for i := range keys {
|
||||
// to keep cache scans fast, check context every 'cancelCheckInterval' iteratons
|
||||
if i%cancelCheckInterval == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
stats = statsFromTimeRangeMaxTimeIters(stats, iters)
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
_, tags = seriesfile.ParseSeriesKeyInto(keys[i], tags[:0])
|
||||
fieldKey := tags.Get(models.FieldKeyTagKeyBytes)
|
||||
keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags)
|
||||
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, fieldKey)
|
||||
|
||||
cur := fieldTypeTime{max: InvalidMinNanoTime}
|
||||
|
||||
ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0])
|
||||
if ts.Len() > 0 {
|
||||
sort.Sort(&ts)
|
||||
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if ts.Contains(start, end) {
|
||||
max := ts.MaxTime()
|
||||
if max > cur.max {
|
||||
cur.max = max
|
||||
cur.typ = BlockTypeToFieldType(e.Cache.BlockType(sfkey))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, iter := range iters {
|
||||
if exact, _ := iter.Seek(sfkey); !exact {
|
||||
continue
|
||||
}
|
||||
|
||||
max := iter.MaxTime()
|
||||
if max > cur.max {
|
||||
cur.max = max
|
||||
cur.typ = BlockTypeToFieldType(iter.Type())
|
||||
}
|
||||
}
|
||||
|
||||
if cur.max != InvalidMinNanoTime {
|
||||
tsmValues[string(fieldKey)] = cur
|
||||
}
|
||||
}
|
||||
|
||||
vals := make([]cursors.MeasurementField, 0, len(tsmValues))
|
||||
for key, val := range tsmValues {
|
||||
vals = append(vals, cursors.MeasurementField{Key: key, Type: val.typ})
|
||||
}
|
||||
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats([]cursors.MeasurementFields{{Fields: vals}}, stats), nil
|
||||
}
|
||||
|
||||
func (e *Engine) fieldsNoPredicate(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, measurement []byte, start int64, end int64) (cursors.MeasurementFieldsIterator, error) {
|
||||
tsmValues := make(map[string]fieldTypeTime)
|
||||
orgBucket := tsdb.EncodeName(orgID, bucketID)
|
||||
|
||||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||
orgBucketEsc := models.EscapeMeasurement(orgBucket[:])
|
||||
|
||||
mt := models.Tags{models.NewTag(models.MeasurementTagKeyBytes, measurement)}
|
||||
tsmKeyPrefix := mt.AppendHashKey(orgBucketEsc)
|
||||
|
||||
var stats cursors.CursorStats
|
||||
var canceled bool
|
||||
|
||||
e.FileStore.ForEachFile(func(f TSMFile) bool {
|
||||
// Check the context before touching each tsm file
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
canceled = true
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(tsmKeyPrefix, tsmKeyPrefix) {
|
||||
// TODO(sgc): create f.TimeRangeIterator(minKey, maxKey, start, end)
|
||||
iter := f.TimeRangeMaxTimeIterator(tsmKeyPrefix, start, end)
|
||||
for i := 0; iter.Next(); i++ {
|
||||
sfkey := iter.Key()
|
||||
if !bytes.HasPrefix(sfkey, tsmKeyPrefix) {
|
||||
// end of prefix
|
||||
break
|
||||
}
|
||||
|
||||
max := iter.MaxTime()
|
||||
if max == InvalidMinNanoTime {
|
||||
continue
|
||||
}
|
||||
|
||||
_, fieldKey := SeriesAndFieldFromCompositeKey(sfkey)
|
||||
v, ok := tsmValues[string(fieldKey)]
|
||||
if !ok || v.max < max {
|
||||
tsmValues[string(fieldKey)] = fieldTypeTime{
|
||||
typ: BlockTypeToFieldType(iter.Type()),
|
||||
max: max,
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.Add(iter.Stats())
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if canceled {
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err()
|
||||
}
|
||||
|
||||
var ts cursors.TimestampArray
|
||||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
tsmKeyPrefixStr := string(tsmKeyPrefix)
|
||||
_ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error {
|
||||
if !strings.HasPrefix(sfkey, tsmKeyPrefixStr) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0])
|
||||
if ts.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Sort(&ts)
|
||||
|
||||
stats.ScannedValues += ts.Len()
|
||||
stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp
|
||||
|
||||
if !ts.Contains(start, end) {
|
||||
return nil
|
||||
}
|
||||
|
||||
max := ts.MaxTime()
|
||||
|
||||
// TODO(edd): consider the []byte() conversion here.
|
||||
_, fieldKey := SeriesAndFieldFromCompositeKey([]byte(sfkey))
|
||||
v, ok := tsmValues[string(fieldKey)]
|
||||
if !ok || v.max < max {
|
||||
tsmValues[string(fieldKey)] = fieldTypeTime{
|
||||
typ: BlockTypeToFieldType(entry.BlockType()),
|
||||
max: max,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
vals := make([]cursors.MeasurementField, 0, len(tsmValues))
|
||||
for key, val := range tsmValues {
|
||||
vals = append(vals, cursors.MeasurementField{Key: key, Type: val.typ})
|
||||
}
|
||||
|
||||
return cursors.NewMeasurementFieldsSliceIteratorWithStats([]cursors.MeasurementFields{{Fields: vals}}, stats), nil
|
||||
}
|
||||
|
||||
func AddMeasurementToExpr(measurement string, base influxql.Expr) influxql.Expr {
|
||||
// \x00 = '<measurement>'
|
||||
expr := &influxql.BinaryExpr{
|
||||
|
|
@ -175,3 +427,10 @@ func AddMeasurementToExpr(measurement string, base influxql.Expr) influxql.Expr
|
|||
|
||||
return expr
|
||||
}
|
||||
|
||||
func statsFromTimeRangeMaxTimeIters(stats cursors.CursorStats, iters []*TimeRangeMaxTimeIterator) cursors.CursorStats {
|
||||
for _, iter := range iters {
|
||||
stats.Add(iter.Stats())
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
|
|
|||
|
|
@ -789,6 +789,277 @@ mem,mem1=v,mem2=v f=1 201`)
|
|||
}
|
||||
}
|
||||
|
||||
func TestEngine_MeasurementFields(t *testing.T) {
|
||||
e, err := NewEngine(tsm1.NewConfig(), t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := e.Open(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
orgs := []struct {
|
||||
org, bucket influxdb.ID
|
||||
}{
|
||||
{
|
||||
org: 0x5020,
|
||||
bucket: 0x5100,
|
||||
},
|
||||
{
|
||||
org: 0x6000,
|
||||
bucket: 0x6100,
|
||||
},
|
||||
}
|
||||
|
||||
// this org will require escaping the 0x20 byte
|
||||
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
|
||||
m00,tag00=v00,tag10=v10 i=1i 101
|
||||
m00,tag00=v00,tag10=v10 i=1i 102
|
||||
m00,tag00=v00,tag10=v10 f=1 101
|
||||
m00,tag00=v00,tag10=v10 i=1i 108
|
||||
m00,tag00=v00,tag10=v10 f=1 109
|
||||
m00,tag00=v00,tag10=v10 i=1i 109
|
||||
m01,tag00=v00,tag10=v10 b=true 101
|
||||
`)
|
||||
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
|
||||
m10,foo=v barF=50 101
|
||||
`)
|
||||
|
||||
// send some points to TSM data
|
||||
e.MustWriteSnapshot()
|
||||
|
||||
// delete some data from the first bucket
|
||||
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 105)
|
||||
|
||||
// leave some points in the cache
|
||||
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
|
||||
m00,tag00=v00,tag10=v10 i=2i 201
|
||||
m00,tag00=v00,tag10=v10 i=2i 202
|
||||
m00,tag00=v00,tag10=v10 f=2 201
|
||||
m00,tag00=v00,tag10=v11 i="s" 202
|
||||
m00,tag00=v00,tag10=v11 i="s" 208
|
||||
m00,tag00=v00,tag10=v11 i="s" 209
|
||||
m01,tag00=v00,tag10=v10 b=true 201
|
||||
`)
|
||||
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
|
||||
m10,foo=v barS="60" 501
|
||||
`)
|
||||
|
||||
type args struct {
|
||||
org int
|
||||
m string
|
||||
min, max int64
|
||||
expr string
|
||||
}
|
||||
|
||||
makeStats := func(v int) cursors.CursorStats {
|
||||
return cursors.CursorStats{
|
||||
ScannedValues: v,
|
||||
ScannedBytes: v * 8,
|
||||
}
|
||||
}
|
||||
|
||||
var tests = []struct {
|
||||
name string
|
||||
args args
|
||||
exp []cursors.MeasurementField
|
||||
expStats cursors.CursorStats
|
||||
}{
|
||||
// ***********************
|
||||
// * queries for the first org, which has some deleted data
|
||||
// ***********************
|
||||
{
|
||||
name: "TSM and cache",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 300,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(12),
|
||||
},
|
||||
{
|
||||
name: "m00 only TSM",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 199,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(12),
|
||||
},
|
||||
{
|
||||
name: "m01 all time",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m01",
|
||||
min: 0,
|
||||
max: 1000,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "b", Type: cursors.Boolean}},
|
||||
expStats: makeStats(1),
|
||||
},
|
||||
{
|
||||
name: "m10 only TSM",
|
||||
args: args{
|
||||
org: 1,
|
||||
m: "m10",
|
||||
min: 0,
|
||||
max: 199,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "barF", Type: cursors.Float}},
|
||||
expStats: makeStats(1),
|
||||
},
|
||||
{
|
||||
name: "only cache",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 200,
|
||||
max: 299,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp TSM/data",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 109,
|
||||
max: 109,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp cache/data",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 201,
|
||||
max: 201,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp change type cache/data",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 202,
|
||||
max: 202,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}},
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
{
|
||||
name: "one timestamp TSM/nodata",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 103,
|
||||
max: 103,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: makeStats(12),
|
||||
},
|
||||
{
|
||||
name: "one timestamp cache/nodata",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 203,
|
||||
max: 203,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: makeStats(6),
|
||||
},
|
||||
|
||||
// queries with predicates
|
||||
{
|
||||
name: "predicate/v10",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 300,
|
||||
expr: `tag10 = 'v10'`,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}},
|
||||
expStats: makeStats(3),
|
||||
},
|
||||
{
|
||||
name: "predicate/v11",
|
||||
args: args{
|
||||
org: 0,
|
||||
m: "m00",
|
||||
min: 0,
|
||||
max: 300,
|
||||
expr: `tag10 = 'v11'`,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}},
|
||||
expStats: makeStats(3),
|
||||
},
|
||||
|
||||
// ***********************
|
||||
// * queries for the second org, which has no deleted data
|
||||
// ***********************
|
||||
{
|
||||
name: "all data",
|
||||
args: args{
|
||||
org: 1,
|
||||
m: "m10",
|
||||
min: 0,
|
||||
max: 1000,
|
||||
},
|
||||
exp: []cursors.MeasurementField{{Key: "barF", Type: cursors.Float}, {Key: "barS", Type: cursors.String}},
|
||||
expStats: makeStats(1),
|
||||
},
|
||||
|
||||
// ***********************
|
||||
// * other scenarios
|
||||
// ***********************
|
||||
{
|
||||
// ensure StringIterator is never nil
|
||||
name: "predicate/no candidate series",
|
||||
args: args{
|
||||
org: 1,
|
||||
m: "m10",
|
||||
min: 0,
|
||||
max: 1000,
|
||||
expr: `foo = 'nonexistent'`,
|
||||
},
|
||||
exp: nil,
|
||||
expStats: makeStats(0),
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("org%d/%s", tc.args.org, tc.name), func(t *testing.T) {
|
||||
a := tc.args
|
||||
var expr influxql.Expr
|
||||
if len(a.expr) > 0 {
|
||||
expr = influxql.MustParseExpr(a.expr)
|
||||
}
|
||||
|
||||
iter, err := e.MeasurementFields(context.Background(), orgs[a.org].org, orgs[a.org].bucket, a.m, a.min, a.max, expr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if got := cursors.MeasurementFieldsIteratorFlatMap(iter); !assert.ElementsMatch(t, tc.exp, got) {
|
||||
return
|
||||
}
|
||||
|
||||
if got := iter.Stats(); !assert.Equal(t, tc.expStats, got) {
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Verifies AddMeasurementToExpr amends the given influxql.Expr
|
||||
// with a predicate to restrict results to a single measurement
|
||||
func TestAddMeasurementToExpr(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue