From cb618efc65fa55d1fb70d84e5d3bfb991b235949 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 8 Apr 2020 16:15:34 -0700 Subject: [PATCH] feat(tsm1): Implementation of MeasurementFields This commit provides an implementation of the MeasurementFields API per the design previously outlined. --- tsdb/cursors/schema.go | 87 ++++++- tsdb/tsm1/engine_measurement_schema.go | 265 ++++++++++++++++++- tsdb/tsm1/engine_measurement_schema_test.go | 271 ++++++++++++++++++++ 3 files changed, 613 insertions(+), 10 deletions(-) diff --git a/tsdb/cursors/schema.go b/tsdb/cursors/schema.go index 1a2d713c64..014f00a3ea 100644 --- a/tsdb/cursors/schema.go +++ b/tsdb/cursors/schema.go @@ -4,11 +4,12 @@ package cursors type FieldType int const ( - Float FieldType = iota // means the data type is a float - Integer // means the data type is an integer - Unsigned // means the data type is an unsigned integer - Boolean // means the data type is a boolean - String // means the data type is a string of text + Float FieldType = iota // means the data type is a float + Integer // means the data type is an integer + Unsigned // means the data type is an unsigned integer + Boolean // means the data type is a boolean + String // means the data type is a string of text + Undefined // means the data type in unknown or undefined ) type MeasurementField struct { @@ -20,8 +21,8 @@ type MeasurementFields struct { Fields []MeasurementField } -type MeasurementFieldsCursor interface { - // Next advances the MeasurementFieldsCursor to the next value. It returns false +type MeasurementFieldsIterator interface { + // Next advances the iterator to the next value. It returns false // when there are no more values. Next() bool @@ -30,3 +31,75 @@ type MeasurementFieldsCursor interface { Stats() CursorStats } + +// EmptyMeasurementFieldsIterator is an implementation of MeasurementFieldsIterator that returns +// no values. +var EmptyMeasurementFieldsIterator = &measurementFieldsIterator{} + +type measurementFieldsIterator struct{} + +func (m *measurementFieldsIterator) Next() bool { return false } +func (m *measurementFieldsIterator) Value() MeasurementFields { return MeasurementFields{} } +func (m *measurementFieldsIterator) Stats() CursorStats { return CursorStats{} } + +type MeasurementFieldsSliceIterator struct { + f []MeasurementFields + v MeasurementFields + i int + stats CursorStats +} + +func NewMeasurementFieldsSliceIteratorWithStats(f []MeasurementFields, stats CursorStats) *MeasurementFieldsSliceIterator { + return &MeasurementFieldsSliceIterator{f: f, stats: stats} +} + +func (s *MeasurementFieldsSliceIterator) Next() bool { + if s.i < len(s.f) { + s.v = s.f[s.i] + s.i++ + return true + } + s.v = MeasurementFields{} + return false +} + +func (s *MeasurementFieldsSliceIterator) Value() MeasurementFields { + return s.v +} + +func (s *MeasurementFieldsSliceIterator) Stats() CursorStats { + return s.stats +} + +func (s *MeasurementFieldsSliceIterator) toSlice() []MeasurementFields { + if s.i < len(s.f) { + return s.f[s.i:] + } + return nil +} + +// MeasurementFieldsIteratorFlatMap reads the remainder of i, flattening the results +// to a single slice. +func MeasurementFieldsIteratorFlatMap(i MeasurementFieldsIterator) []MeasurementField { + if i == nil { + return nil + } + + var res []MeasurementField + if si, ok := i.(*MeasurementFieldsSliceIterator); ok { + s := si.toSlice() + sz := 0 + for i := range s { + sz += len(s[i].Fields) + } + res = make([]MeasurementField, 0, sz) + for i := range s { + res = append(res, s[i].Fields...) + } + } else { + for i.Next() { + res = append(res, i.Value().Fields...) + } + } + return res +} diff --git a/tsdb/tsm1/engine_measurement_schema.go b/tsdb/tsm1/engine_measurement_schema.go index bc55c4d3bc..6ae39c6013 100644 --- a/tsdb/tsm1/engine_measurement_schema.go +++ b/tsdb/tsm1/engine_measurement_schema.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/cursors" + "github.com/influxdata/influxdb/v2/tsdb/seriesfile" "github.com/influxdata/influxql" "go.uber.org/zap" ) @@ -74,6 +75,8 @@ func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb. return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err() } + var ts cursors.TimestampArray + // With performance in mind, we explicitly do not check the context // while scanning the entries in the cache. prefixStr := string(prefix) @@ -94,10 +97,17 @@ func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb. return nil } - stats.ScannedValues += entry.values.Len() - stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp + ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0]) + if ts.Len() == 0 { + return nil + } - if entry.values.Contains(start, end) { + sort.Sort(&ts) + + stats.ScannedValues += ts.Len() + stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp + + if ts.Contains(start, end) { tsmValues[string(name)] = struct{}{} } return nil @@ -149,6 +159,248 @@ func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxd return e.tagKeysPredicate(ctx, orgID, bucketID, []byte(measurement), start, end, predicate) } +func (e *Engine) MeasurementFields(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) { + if predicate == nil { + return e.fieldsNoPredicate(ctx, orgID, bucketID, []byte(measurement), start, end) + } + + predicate = AddMeasurementToExpr(measurement, predicate) + + return e.fieldsPredicate(ctx, orgID, bucketID, []byte(measurement), start, end, predicate) +} + +type fieldTypeTime struct { + typ cursors.FieldType + max int64 +} + +func (e *Engine) fieldsPredicate(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, measurement []byte, start int64, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error) { + if err := ValidateTagPredicate(predicate); err != nil { + return nil, err + } + + orgBucket := tsdb.EncodeName(orgID, bucketID) + + keys, err := e.findCandidateKeys(ctx, orgBucket[:], predicate) + if err != nil { + return cursors.EmptyMeasurementFieldsIterator, err + } + + if len(keys) == 0 { + return cursors.EmptyMeasurementFieldsIterator, nil + } + + var files []TSMFile + defer func() { + for _, f := range files { + f.Unref() + } + }() + var iters []*TimeRangeMaxTimeIterator + + // TODO(edd): we need to clean up how we're encoding the prefix so that we + // don't have to remember to get it right everywhere we need to touch TSM data. + orgBucketEsc := models.EscapeMeasurement(orgBucket[:]) + + mt := models.Tags{models.NewTag(models.MeasurementTagKeyBytes, measurement)} + tsmKeyPrefix := mt.AppendHashKey(orgBucketEsc) + + var canceled bool + + e.FileStore.ForEachFile(func(f TSMFile) bool { + // Check the context before accessing each tsm file + select { + case <-ctx.Done(): + canceled = true + return false + default: + } + if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(tsmKeyPrefix, tsmKeyPrefix) { + f.Ref() + files = append(files, f) + iters = append(iters, f.TimeRangeMaxTimeIterator(tsmKeyPrefix, start, end)) + } + return true + }) + + var stats cursors.CursorStats + + if canceled { + stats = statsFromTimeRangeMaxTimeIters(stats, iters) + return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err() + } + + tsmValues := make(map[string]fieldTypeTime) + + // reusable buffers + var ( + tags models.Tags + keybuf []byte + sfkey []byte + ts cursors.TimestampArray + ) + + for i := range keys { + // to keep cache scans fast, check context every 'cancelCheckInterval' iteratons + if i%cancelCheckInterval == 0 { + select { + case <-ctx.Done(): + stats = statsFromTimeRangeMaxTimeIters(stats, iters) + return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err() + default: + } + } + + _, tags = seriesfile.ParseSeriesKeyInto(keys[i], tags[:0]) + fieldKey := tags.Get(models.FieldKeyTagKeyBytes) + keybuf = models.AppendMakeKey(keybuf[:0], orgBucketEsc, tags) + sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, fieldKey) + + cur := fieldTypeTime{max: InvalidMinNanoTime} + + ts.Timestamps = e.Cache.AppendTimestamps(sfkey, ts.Timestamps[:0]) + if ts.Len() > 0 { + sort.Sort(&ts) + + stats.ScannedValues += ts.Len() + stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp + + if ts.Contains(start, end) { + max := ts.MaxTime() + if max > cur.max { + cur.max = max + cur.typ = BlockTypeToFieldType(e.Cache.BlockType(sfkey)) + } + } + } + + for _, iter := range iters { + if exact, _ := iter.Seek(sfkey); !exact { + continue + } + + max := iter.MaxTime() + if max > cur.max { + cur.max = max + cur.typ = BlockTypeToFieldType(iter.Type()) + } + } + + if cur.max != InvalidMinNanoTime { + tsmValues[string(fieldKey)] = cur + } + } + + vals := make([]cursors.MeasurementField, 0, len(tsmValues)) + for key, val := range tsmValues { + vals = append(vals, cursors.MeasurementField{Key: key, Type: val.typ}) + } + + return cursors.NewMeasurementFieldsSliceIteratorWithStats([]cursors.MeasurementFields{{Fields: vals}}, stats), nil +} + +func (e *Engine) fieldsNoPredicate(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, measurement []byte, start int64, end int64) (cursors.MeasurementFieldsIterator, error) { + tsmValues := make(map[string]fieldTypeTime) + orgBucket := tsdb.EncodeName(orgID, bucketID) + + // TODO(edd): we need to clean up how we're encoding the prefix so that we + // don't have to remember to get it right everywhere we need to touch TSM data. + orgBucketEsc := models.EscapeMeasurement(orgBucket[:]) + + mt := models.Tags{models.NewTag(models.MeasurementTagKeyBytes, measurement)} + tsmKeyPrefix := mt.AppendHashKey(orgBucketEsc) + + var stats cursors.CursorStats + var canceled bool + + e.FileStore.ForEachFile(func(f TSMFile) bool { + // Check the context before touching each tsm file + select { + case <-ctx.Done(): + canceled = true + return false + default: + } + if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(tsmKeyPrefix, tsmKeyPrefix) { + // TODO(sgc): create f.TimeRangeIterator(minKey, maxKey, start, end) + iter := f.TimeRangeMaxTimeIterator(tsmKeyPrefix, start, end) + for i := 0; iter.Next(); i++ { + sfkey := iter.Key() + if !bytes.HasPrefix(sfkey, tsmKeyPrefix) { + // end of prefix + break + } + + max := iter.MaxTime() + if max == InvalidMinNanoTime { + continue + } + + _, fieldKey := SeriesAndFieldFromCompositeKey(sfkey) + v, ok := tsmValues[string(fieldKey)] + if !ok || v.max < max { + tsmValues[string(fieldKey)] = fieldTypeTime{ + typ: BlockTypeToFieldType(iter.Type()), + max: max, + } + } + } + stats.Add(iter.Stats()) + } + return true + }) + + if canceled { + return cursors.NewMeasurementFieldsSliceIteratorWithStats(nil, stats), ctx.Err() + } + + var ts cursors.TimestampArray + + // With performance in mind, we explicitly do not check the context + // while scanning the entries in the cache. + tsmKeyPrefixStr := string(tsmKeyPrefix) + _ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error { + if !strings.HasPrefix(sfkey, tsmKeyPrefixStr) { + return nil + } + + ts.Timestamps = entry.AppendTimestamps(ts.Timestamps[:0]) + if ts.Len() == 0 { + return nil + } + + sort.Sort(&ts) + + stats.ScannedValues += ts.Len() + stats.ScannedBytes += ts.Len() * 8 // sizeof timestamp + + if !ts.Contains(start, end) { + return nil + } + + max := ts.MaxTime() + + // TODO(edd): consider the []byte() conversion here. + _, fieldKey := SeriesAndFieldFromCompositeKey([]byte(sfkey)) + v, ok := tsmValues[string(fieldKey)] + if !ok || v.max < max { + tsmValues[string(fieldKey)] = fieldTypeTime{ + typ: BlockTypeToFieldType(entry.BlockType()), + max: max, + } + } + + return nil + }) + + vals := make([]cursors.MeasurementField, 0, len(tsmValues)) + for key, val := range tsmValues { + vals = append(vals, cursors.MeasurementField{Key: key, Type: val.typ}) + } + + return cursors.NewMeasurementFieldsSliceIteratorWithStats([]cursors.MeasurementFields{{Fields: vals}}, stats), nil +} + func AddMeasurementToExpr(measurement string, base influxql.Expr) influxql.Expr { // \x00 = '' expr := &influxql.BinaryExpr{ @@ -175,3 +427,10 @@ func AddMeasurementToExpr(measurement string, base influxql.Expr) influxql.Expr return expr } + +func statsFromTimeRangeMaxTimeIters(stats cursors.CursorStats, iters []*TimeRangeMaxTimeIterator) cursors.CursorStats { + for _, iter := range iters { + stats.Add(iter.Stats()) + } + return stats +} diff --git a/tsdb/tsm1/engine_measurement_schema_test.go b/tsdb/tsm1/engine_measurement_schema_test.go index 9f0a59905f..726971e9cb 100644 --- a/tsdb/tsm1/engine_measurement_schema_test.go +++ b/tsdb/tsm1/engine_measurement_schema_test.go @@ -789,6 +789,277 @@ mem,mem1=v,mem2=v f=1 201`) } } +func TestEngine_MeasurementFields(t *testing.T) { + e, err := NewEngine(tsm1.NewConfig(), t) + if err != nil { + t.Fatal(err) + } + if err := e.Open(context.Background()); err != nil { + t.Fatal(err) + } + defer e.Close() + + orgs := []struct { + org, bucket influxdb.ID + }{ + { + org: 0x5020, + bucket: 0x5100, + }, + { + org: 0x6000, + bucket: 0x6100, + }, + } + + // this org will require escaping the 0x20 byte + e.MustWritePointsString(orgs[0].org, orgs[0].bucket, ` +m00,tag00=v00,tag10=v10 i=1i 101 +m00,tag00=v00,tag10=v10 i=1i 102 +m00,tag00=v00,tag10=v10 f=1 101 +m00,tag00=v00,tag10=v10 i=1i 108 +m00,tag00=v00,tag10=v10 f=1 109 +m00,tag00=v00,tag10=v10 i=1i 109 +m01,tag00=v00,tag10=v10 b=true 101 +`) + e.MustWritePointsString(orgs[1].org, orgs[1].bucket, ` +m10,foo=v barF=50 101 +`) + + // send some points to TSM data + e.MustWriteSnapshot() + + // delete some data from the first bucket + e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 105) + + // leave some points in the cache + e.MustWritePointsString(orgs[0].org, orgs[0].bucket, ` +m00,tag00=v00,tag10=v10 i=2i 201 +m00,tag00=v00,tag10=v10 i=2i 202 +m00,tag00=v00,tag10=v10 f=2 201 +m00,tag00=v00,tag10=v11 i="s" 202 +m00,tag00=v00,tag10=v11 i="s" 208 +m00,tag00=v00,tag10=v11 i="s" 209 +m01,tag00=v00,tag10=v10 b=true 201 +`) + e.MustWritePointsString(orgs[1].org, orgs[1].bucket, ` +m10,foo=v barS="60" 501 +`) + + type args struct { + org int + m string + min, max int64 + expr string + } + + makeStats := func(v int) cursors.CursorStats { + return cursors.CursorStats{ + ScannedValues: v, + ScannedBytes: v * 8, + } + } + + var tests = []struct { + name string + args args + exp []cursors.MeasurementField + expStats cursors.CursorStats + }{ + // *********************** + // * queries for the first org, which has some deleted data + // *********************** + { + name: "TSM and cache", + args: args{ + org: 0, + m: "m00", + min: 0, + max: 300, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}, {Key: "f", Type: cursors.Float}}, + expStats: makeStats(12), + }, + { + name: "m00 only TSM", + args: args{ + org: 0, + m: "m00", + min: 0, + max: 199, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}}, + expStats: makeStats(12), + }, + { + name: "m01 all time", + args: args{ + org: 0, + m: "m01", + min: 0, + max: 1000, + }, + exp: []cursors.MeasurementField{{Key: "b", Type: cursors.Boolean}}, + expStats: makeStats(1), + }, + { + name: "m10 only TSM", + args: args{ + org: 1, + m: "m10", + min: 0, + max: 199, + }, + exp: []cursors.MeasurementField{{Key: "barF", Type: cursors.Float}}, + expStats: makeStats(1), + }, + { + name: "only cache", + args: args{ + org: 0, + m: "m00", + min: 200, + max: 299, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}, {Key: "f", Type: cursors.Float}}, + expStats: makeStats(6), + }, + { + name: "one timestamp TSM/data", + args: args{ + org: 0, + m: "m00", + min: 109, + max: 109, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}}, + expStats: makeStats(6), + }, + { + name: "one timestamp cache/data", + args: args{ + org: 0, + m: "m00", + min: 201, + max: 201, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}}, + expStats: makeStats(6), + }, + { + name: "one timestamp change type cache/data", + args: args{ + org: 0, + m: "m00", + min: 202, + max: 202, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}}, + expStats: makeStats(6), + }, + { + name: "one timestamp TSM/nodata", + args: args{ + org: 0, + m: "m00", + min: 103, + max: 103, + }, + exp: nil, + expStats: makeStats(12), + }, + { + name: "one timestamp cache/nodata", + args: args{ + org: 0, + m: "m00", + min: 203, + max: 203, + }, + exp: nil, + expStats: makeStats(6), + }, + + // queries with predicates + { + name: "predicate/v10", + args: args{ + org: 0, + m: "m00", + min: 0, + max: 300, + expr: `tag10 = 'v10'`, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.Integer}, {Key: "f", Type: cursors.Float}}, + expStats: makeStats(3), + }, + { + name: "predicate/v11", + args: args{ + org: 0, + m: "m00", + min: 0, + max: 300, + expr: `tag10 = 'v11'`, + }, + exp: []cursors.MeasurementField{{Key: "i", Type: cursors.String}}, + expStats: makeStats(3), + }, + + // *********************** + // * queries for the second org, which has no deleted data + // *********************** + { + name: "all data", + args: args{ + org: 1, + m: "m10", + min: 0, + max: 1000, + }, + exp: []cursors.MeasurementField{{Key: "barF", Type: cursors.Float}, {Key: "barS", Type: cursors.String}}, + expStats: makeStats(1), + }, + + // *********************** + // * other scenarios + // *********************** + { + // ensure StringIterator is never nil + name: "predicate/no candidate series", + args: args{ + org: 1, + m: "m10", + min: 0, + max: 1000, + expr: `foo = 'nonexistent'`, + }, + exp: nil, + expStats: makeStats(0), + }, + } + for _, tc := range tests { + t.Run(fmt.Sprintf("org%d/%s", tc.args.org, tc.name), func(t *testing.T) { + a := tc.args + var expr influxql.Expr + if len(a.expr) > 0 { + expr = influxql.MustParseExpr(a.expr) + } + + iter, err := e.MeasurementFields(context.Background(), orgs[a.org].org, orgs[a.org].bucket, a.m, a.min, a.max, expr) + assert.NoError(t, err) + + if got := cursors.MeasurementFieldsIteratorFlatMap(iter); !assert.ElementsMatch(t, tc.exp, got) { + return + } + + if got := iter.Stats(); !assert.Equal(t, tc.expStats, got) { + return + } + }) + } +} + // Verifies AddMeasurementToExpr amends the given influxql.Expr // with a predicate to restrict results to a single measurement func TestAddMeasurementToExpr(t *testing.T) {