Merge pull request #18894 from influxdata/sgc/issues/influxql_schema

feat(storage): InfluxQL schema APIs without time range
pull/18903/head
Stuart Carnie 2020-07-09 10:47:25 -07:00 committed by GitHub
commit dd98d65bac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 690 additions and 3 deletions

View File

@ -0,0 +1,60 @@
package storage
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/influxdata/influxql"
)
// MeasurementNamesNoTime returns an iterator which enumerates the measurements for the given
// bucket.
//
// MeasurementNamesNoTime will always return a StringIterator if there is no error.
//
// If the context is canceled before MeasurementNamesNoTime has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementNamesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, predicate influxql.Expr) (cursors.StringIterator, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return cursors.EmptyStringIterator, nil
}
return e.engine.MeasurementNamesNoTime(ctx, orgID, bucketID, predicate)
}
// MeasurementTagValuesNoTime returns an iterator which enumerates the tag values for the given
// bucket, measurement and tag key and filtered using the optional the predicate.
//
// MeasurementTagValuesNoTime will always return a StringIterator if there is no error.
//
// If the context is canceled before MeasurementTagValuesNoTime has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementTagValuesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, predicate influxql.Expr) (cursors.StringIterator, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return cursors.EmptyStringIterator, nil
}
return e.engine.MeasurementTagValuesNoTime(ctx, orgID, bucketID, measurement, tagKey, predicate)
}
// MeasurementFieldsNoTime returns an iterator which enumerates the field schema for the given
// bucket and measurement, filtered using the optional the predicate.
//
// MeasurementFieldsNoTime will always return a MeasurementFieldsIterator if there is no error.
//
// If the context is canceled before MeasurementFieldsNoTime has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementFieldsNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return cursors.EmptyMeasurementFieldsIterator, nil
}
return e.engine.MeasurementFieldsNoTime(ctx, orgID, bucketID, measurement, predicate)
}

View File

@ -1,6 +1,9 @@
package cursors
import "github.com/influxdata/influxql"
import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxql"
)
// FieldType represents the primitive field data types available in tsm.
type FieldType int
@ -37,6 +40,25 @@ func FieldTypeToDataType(ft FieldType) influxql.DataType {
// current value. Undefined has the lowest precedence.
func (ft FieldType) IsLower(other FieldType) bool { return other < ft }
var (
modelsFieldTypeToFieldTypeMapping = [8]FieldType{
models.Integer: Integer,
models.Float: Float,
models.Boolean: Boolean,
models.String: String,
models.Empty: Undefined,
models.Unsigned: Unsigned,
6: Undefined,
7: Undefined,
}
)
// ModelsFieldTypeToFieldType returns the equivalent FieldType for ft.
// If ft is an invalid FieldType, the results are undefined.
func ModelsFieldTypeToFieldType(ft models.FieldType) FieldType {
return modelsFieldTypeToFieldTypeMapping[ft&7]
}
type MeasurementField struct {
Key string // Key is the name of the field
Type FieldType // Type is field type
@ -119,6 +141,10 @@ type MeasurementFieldsSliceIterator struct {
stats CursorStats
}
func NewMeasurementFieldsSliceIterator(f []MeasurementFields) *MeasurementFieldsSliceIterator {
return &MeasurementFieldsSliceIterator{f: f}
}
func NewMeasurementFieldsSliceIteratorWithStats(f []MeasurementFields, stats CursorStats) *MeasurementFieldsSliceIterator {
return &MeasurementFieldsSliceIterator{f: f, stats: stats}
}

View File

@ -0,0 +1,242 @@
package tsm1
import (
"context"
"sort"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/influxdata/influxql"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
)
// MeasurementNamesNoTime returns an iterator which enumerates the measurements for the given
// bucket.
//
// MeasurementNamesNoTime will always return a StringIterator if there is no error.
//
// If the context is canceled before MeasurementNamesNoTime has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementNamesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, predicate influxql.Expr) (cursors.StringIterator, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return e.tagValuesNoTime(ctx, orgID, bucketID, models.MeasurementTagKeyBytes, predicate)
}
// MeasurementTagValuesNoTime returns an iterator which enumerates the tag values for the given
// bucket, measurement and tag key and filtered using the optional the predicate.
//
// MeasurementTagValuesNoTime will always return a StringIterator if there is no error.
//
// If the context is canceled before MeasurementTagValuesNoTime has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementTagValuesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, predicate influxql.Expr) (cursors.StringIterator, error) {
predicate = AddMeasurementToExpr(measurement, predicate)
return e.tagValuesNoTime(ctx, orgID, bucketID, []byte(tagKey), predicate)
}
func (e *Engine) tagValuesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, tagKeyBytes []byte, predicate influxql.Expr) (cursors.StringIterator, error) {
if err := ValidateTagPredicate(predicate); err != nil {
return nil, err
}
orgBucket := tsdb.EncodeName(orgID, bucketID)
// fetch distinct values for tag key in bucket
itr, err := e.index.TagValueIterator(orgBucket[:], tagKeyBytes)
if err != nil {
return nil, err
} else if itr == nil {
return cursors.NewStringSliceIterator(nil), err
}
defer itr.Close()
var (
vals = make([]string, 0, 128)
)
span := opentracing.SpanFromContext(ctx)
if span != nil {
defer func() {
span.LogFields(
log.Int("values_count", len(vals)),
)
}()
}
// reusable buffers
var (
tagKey = string(tagKeyBytes)
)
for i := 0; ; i++ {
// to keep cache scans fast, check context every 'cancelCheckInterval' iterations
if i%cancelCheckInterval == 0 {
select {
case <-ctx.Done():
return cursors.NewStringSliceIterator(nil), ctx.Err()
default:
}
}
val, err := itr.Next()
if err != nil {
return cursors.NewStringSliceIterator(nil), err
} else if len(val) == 0 {
break
}
// <tagKey> = val
var expr influxql.Expr = &influxql.BinaryExpr{
LHS: &influxql.VarRef{Val: tagKey, Type: influxql.Tag},
Op: influxql.EQ,
RHS: &influxql.StringLiteral{Val: string(val)},
}
if predicate != nil {
// <tagKey> = val AND (expr)
expr = &influxql.BinaryExpr{
LHS: expr,
Op: influxql.AND,
RHS: &influxql.ParenExpr{
Expr: predicate,
},
}
}
if err := func() error {
sitr, err := e.index.MeasurementSeriesByExprIterator(orgBucket[:], expr)
if err != nil {
return err
}
defer sitr.Close()
if elem, err := sitr.Next(); err != nil {
return err
} else if !elem.SeriesID.IsZero() {
vals = append(vals, string(val))
}
return nil
}(); err != nil {
return cursors.NewStringSliceIterator(nil), err
}
}
sort.Strings(vals)
return cursors.NewStringSliceIterator(vals), err
}
// MeasurementFieldsNoTime returns an iterator which enumerates the field schema for the given
// bucket and measurement, filtered using the optional the predicate.
//
// MeasurementFieldsNoTime will always return a MeasurementFieldsIterator if there is no error.
//
// If the context is canceled before MeasurementFieldsNoTime has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementFieldsNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
predicate = AddMeasurementToExpr(measurement, predicate)
return e.fieldsNoTime(ctx, orgID, bucketID, []byte(measurement), predicate)
}
func (e *Engine) fieldsNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement []byte, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) {
type fieldKeyType struct {
key []byte
typ cursors.FieldType
}
if err := ValidateTagPredicate(predicate); err != nil {
return nil, err
}
orgBucket := tsdb.EncodeName(orgID, bucketID)
// fetch distinct values for field, which may be a superset of the measurement
itr, err := e.index.TagValueIterator(orgBucket[:], models.FieldKeyTagKeyBytes)
if err != nil {
return nil, err
}
defer itr.Close()
var (
fieldTypes = make([]fieldKeyType, 0, 128)
)
span := opentracing.SpanFromContext(ctx)
if span != nil {
defer func() {
span.LogFields(
log.Int("values_count", len(fieldTypes)),
)
}()
}
for i := 0; ; i++ {
// to keep cache scans fast, check context every 'cancelCheckInterval' iterations
if i%cancelCheckInterval == 0 {
select {
case <-ctx.Done():
return cursors.NewMeasurementFieldsSliceIterator(nil), ctx.Err()
default:
}
}
val, err := itr.Next()
if err != nil {
return cursors.NewMeasurementFieldsSliceIterator(nil), err
} else if len(val) == 0 {
break
}
// <tagKey> = val
var expr influxql.Expr = &influxql.BinaryExpr{
LHS: &influxql.VarRef{Val: models.FieldKeyTagKey, Type: influxql.Tag},
Op: influxql.EQ,
RHS: &influxql.StringLiteral{Val: string(val)},
}
if predicate != nil {
// <tagKey> = val AND (expr)
expr = &influxql.BinaryExpr{
LHS: expr,
Op: influxql.AND,
RHS: &influxql.ParenExpr{
Expr: predicate,
},
}
}
if err := func() error {
sitr, err := e.index.MeasurementSeriesByExprIterator(orgBucket[:], expr)
if err != nil {
return err
}
defer sitr.Close()
if elem, err := sitr.Next(); err != nil {
return err
} else if !elem.SeriesID.IsZero() {
key := e.sfile.SeriesKey(elem.SeriesID)
typedID := e.sfile.SeriesIDTypedBySeriesKey(key)
fieldTypes = append(fieldTypes, fieldKeyType{key: val, typ: cursors.ModelsFieldTypeToFieldType(typedID.Type())})
}
return nil
}(); err != nil {
return cursors.NewMeasurementFieldsSliceIterator(nil), err
}
}
vals := make([]cursors.MeasurementField, 0, len(fieldTypes))
for i := range fieldTypes {
val := &fieldTypes[i]
vals = append(vals, cursors.MeasurementField{Key: string(val.key), Type: val.typ, Timestamp: 0})
}
return cursors.NewMeasurementFieldsSliceIterator([]cursors.MeasurementFields{{Fields: vals}}), nil
}

View File

@ -0,0 +1,359 @@
package tsm1_test
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func strL(s ...string) []string { return s }
func TestEngine_MeasurementNamesNoTime(t *testing.T) {
e, err := NewEngine(tsm1.NewConfig(), t)
if err != nil {
t.Fatal(err)
}
if err := e.Open(context.Background()); err != nil {
t.Fatal(err)
}
defer e.Close()
orgs := []struct {
org, bucket influxdb.ID
}{
{
org: 0x5020,
bucket: 0x5100,
},
{
org: 0x6000,
bucket: 0x6100,
},
}
// this org will require escaping the 0x20 byte
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
cpu,cpu0=v,cpu1=v,cpu2=v f=1 101
cpu,cpu1=v f=1 103
cpu,cpu2=v f=1 105
cpu,cpu0=v,cpu2=v f=1 107
cpu,cpu2=v,cpu3=v,other=c f=1 109
mem,mem0=v,mem1=v,other=m f=1 101`)
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
cpu2,cpu0=v,cpu1=v,cpu2=v f=1 101
cpu2,cpu1=v f=1 103
cpu2,cpu2=v f=1 105
cpu2,cpu0=v,cpu2=v f=1 107
cpu2,cpu2=v,cpu3=v,other=c f=1 109
mem2,mem0=v,mem1=v,other=m f=1 101`)
// this test verifies the index is immediately queryable before TSM is written
t.Run("gets all measurements before snapshot", func(t *testing.T) {
iter, err := e.MeasurementNamesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("cpu", "mem"))
})
// this test verifies the index is immediately queryable before TSM is written
t.Run("verify subset of measurements with predicate", func(t *testing.T) {
iter, err := e.MeasurementNamesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, influxql.MustParseExpr("other = 'c'"))
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("cpu"))
})
// delete some data from the first bucket
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 105)
// this test verifies measurement disappears if deleted whilst in cache
t.Run("only contains cpu measurement", func(t *testing.T) {
iter, err := e.MeasurementNamesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("cpu"))
})
// write the values back
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
cpu,cpu0=v,cpu1=v,cpu2=v f=1 101
cpu,cpu1=v f=1 103
cpu,cpu2=v f=1 105
mem,mem0=v,mem1=v,other=m f=1 101`)
// send some points to TSM data
e.MustWriteSnapshot()
// this test verifies the index is immediately queryable before TSM is written
t.Run("contains cpu and mem measurement in TSM", func(t *testing.T) {
iter, err := e.MeasurementNamesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("cpu", "mem"))
})
// delete some data from the first bucket
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 105)
// this test verifies measurement disappears if deleted from TSM
t.Run("only contains cpu measurement in TSM", func(t *testing.T) {
iter, err := e.MeasurementNamesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("cpu"))
})
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 1000)
// this test verifies all measurements disappears if deleted
t.Run("no measurements", func(t *testing.T) {
iter, err := e.MeasurementNamesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL())
})
}
func TestEngine_MeasurementTagValuesNoTime(t *testing.T) {
e, err := NewEngine(tsm1.NewConfig(), t)
if err != nil {
t.Fatal(err)
}
if err := e.Open(context.Background()); err != nil {
t.Fatal(err)
}
defer e.Close()
orgs := []struct {
org, bucket influxdb.ID
}{
{
org: 0x5020,
bucket: 0x5100,
},
{
org: 0x6000,
bucket: 0x6100,
},
}
// this org will require escaping the 0x20 byte
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
cpuA,host=0A,os=linux value=1.1 101
cpuA,host=AA,os=linux value=1.2 102
cpuA,host=AA,os=linux value=1.3 104
cpuA,host=CA,os=linux value=1.3 104
cpuA,host=CA,os=linux value=1.3 105
cpuA,host=DA,os=macOS value=1.3 106
memA,host=DA,os=macOS value=1.3 101`)
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
cpuB,host=0B,os=linux value=1.1 101
cpuB,host=AB,os=linux value=1.2 102
cpuB,host=AB,os=linux value=1.3 104
cpuB,host=CB,os=linux value=1.3 104
cpuB,host=CB,os=linux value=1.3 105
cpuB,host=DB,os=macOS value=1.3 106
memB,host=DB,os=macOS value=1.3 101`)
t.Run("before snapshot", func(t *testing.T) {
t.Run("cpuA", func(t *testing.T) {
t.Run("host tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("0A", "AA", "CA", "DA"))
})
t.Run("host tag returns subset with predicate", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", influxql.MustParseExpr("os = 'macOS'"))
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("DA"))
})
})
t.Run("memA", func(t *testing.T) {
t.Run("host tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "memA", "host", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("DA"))
})
t.Run("os tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "memA", "os", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("macOS"))
})
})
})
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 102, 105)
t.Run("before snapshot after delete", func(t *testing.T) {
t.Run("cpuA", func(t *testing.T) {
t.Run("host tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("0A", "DA"))
})
t.Run("host tag returns subset with predicate", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", influxql.MustParseExpr("os = 'macOS'"))
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("DA"))
})
})
t.Run("memA", func(t *testing.T) {
t.Run("host tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "memA", "host", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("DA"))
})
t.Run("os tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "memA", "os", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("macOS"))
})
})
})
// send some points to TSM data
e.MustWriteSnapshot()
// leave some points in the cache
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
cpuA,host=0A,os=linux value=1.1 201
cpuA,host=AA,os=linux value=1.2 202
cpuA,host=AA,os=linux value=1.3 204
cpuA,host=BA,os=macOS value=1.3 204
cpuA,host=BA,os=macOS value=1.3 205
cpuA,host=EA,os=linux value=1.3 206
memA,host=EA,os=linux value=1.3 201`)
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
cpuB,host=0B,os=linux value=1.1 201
cpuB,host=AB,os=linux value=1.2 202
cpuB,host=AB,os=linux value=1.3 204
cpuB,host=BB,os=linux value=1.3 204
cpuB,host=BB,os=linux value=1.3 205
cpuB,host=EB,os=macOS value=1.3 206
memB,host=EB,os=macOS value=1.3 201`)
t.Run("after snapshot", func(t *testing.T) {
t.Run("cpuA", func(t *testing.T) {
t.Run("host tag returns all values", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("0A", "AA", "BA", "DA", "EA"))
})
t.Run("host tag returns subset with predicate", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", influxql.MustParseExpr("os = 'macOS'"))
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL("BA", "DA"))
})
})
})
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 1000)
t.Run("returns no data after deleting everything", func(t *testing.T) {
iter, err := e.MeasurementTagValuesNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "cpuA", "host", nil)
require.NoError(t, err)
assert.Equal(t, cursors.StringIteratorToSlice(iter), strL())
})
}
func TestEngine_MeasurementFieldsNoTime(t *testing.T) {
e, err := NewEngine(tsm1.NewConfig(), t)
if err != nil {
t.Fatal(err)
}
if err := e.Open(context.Background()); err != nil {
t.Fatal(err)
}
defer e.Close()
orgs := []struct {
org, bucket influxdb.ID
}{
{
org: 0x5020,
bucket: 0x5100,
},
{
org: 0x6000,
bucket: 0x6100,
},
}
// this org will require escaping the 0x20 byte
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
m00,tag00=v00,tag10=v10 i=1i 101
m00,tag00=v00,tag10=v11 i=1i 102
m00,tag00=v00,tag10=v12 f=1 101
m00,tag00=v00,tag10=v13 i=1i 108
m00,tag00=v00,tag10=v14 f=1 109
m00,tag00=v00,tag10=v15 i=1i 109
m01,tag00=v00,tag10=v10 b=true 101
`)
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
m10,foo=v barF=50 101
`)
fldL := func(t *testing.T, kv ...interface{}) []cursors.MeasurementField {
t.Helper()
if len(kv)&1 == 1 {
panic("uneven kv slice")
}
res := make([]cursors.MeasurementField, 0, len(kv)/2)
for i := 0; i < len(kv); i += 2 {
res = append(res, cursors.MeasurementField{
Key: kv[i].(string),
Type: kv[i+1].(cursors.FieldType),
})
}
return res
}
t.Run("first writes", func(t *testing.T) {
t.Run("m00 no predicate", func(t *testing.T) {
iter, err := e.MeasurementFieldsNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "m00", nil)
require.NoError(t, err)
assert.Equal(t, cursors.MeasurementFieldsIteratorFlatMap(iter), fldL(t, "f", cursors.Float, "i", cursors.Integer))
})
t.Run("m00 with predicate", func(t *testing.T) {
iter, err := e.MeasurementFieldsNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "m00", influxql.MustParseExpr("tag10 = 'v15'"))
require.NoError(t, err)
assert.Equal(t, cursors.MeasurementFieldsIteratorFlatMap(iter), fldL(t, "i", cursors.Integer))
})
t.Run("m01 no predicate", func(t *testing.T) {
iter, err := e.MeasurementFieldsNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "m01", nil)
require.NoError(t, err)
assert.Equal(t, cursors.MeasurementFieldsIteratorFlatMap(iter), fldL(t, "b", cursors.Boolean))
})
})
// change type of field i (which is not expected, and won't be supported in the future)
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
m00,tag00=v00,tag10=v22 f=1 201
m00,tag00=v00,tag10=v21 i="s" 202
m00,tag00=v00,tag10=v20 b=true 210
`)
t.Run("i is still integer", func(t *testing.T) {
iter, err := e.MeasurementFieldsNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "m00", nil)
require.NoError(t, err)
assert.Equal(t, cursors.MeasurementFieldsIteratorFlatMap(iter), fldL(t, "b", cursors.Boolean, "f", cursors.Float, "i", cursors.Integer))
})
// delete earlier data
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 200)
t.Run("i is now a string", func(t *testing.T) {
iter, err := e.MeasurementFieldsNoTime(context.Background(), orgs[0].org, orgs[0].bucket, "m00", nil)
require.NoError(t, err)
assert.Equal(t, cursors.MeasurementFieldsIteratorFlatMap(iter), fldL(t, "b", cursors.Boolean, "f", cursors.Float, "i", cursors.String))
})
}

View File

@ -264,7 +264,7 @@ func (e *Engine) measurementNamesPredicate(ctx context.Context, orgID, bucketID
//
// MeasurementTagValues will always return a StringIterator if there is no error.
//
// If the context is canceled before TagValues has finished processing, a non-nil
// If the context is canceled before MeasurementTagValues has finished processing, a non-nil
// error will be returned along with statistics for the already scanned data.
func (e *Engine) MeasurementTagValues(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
if predicate == nil {
@ -297,7 +297,7 @@ func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxd
// MeasurementFields returns an iterator which enumerates the field schema for the given
// bucket and measurement, filtered using the optional the predicate and limited to the
//// time range [start, end].
// time range [start, end].
//
// MeasurementFields will always return a MeasurementFieldsIterator if there is no error.
//