feat: Teach storage engine how to find tag values for a given key

The TagValues API will perform a linear scan if there is no predicate;
otherwise, it will use the index to find a list of candidate series
keys.

TagValues expects the predicate to be transformed such that
`_measurement` and `_field` are remapped to `\x00` and `\xff`
respectively.

There is one TODO marked to analyze the predicate for a
`\x00 = '<measurement>'` pattern. If found, the predicate can be
eliminated and fall back to a linear prefix scan by combining the org,
bucket and measurement.
pull/13426/head
Stuart Carnie 2019-04-17 17:02:38 -07:00
parent 35e0094a28
commit d3790aa072
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
7 changed files with 637 additions and 0 deletions

View File

@ -1337,6 +1337,15 @@ func SeriesFieldKeyBytes(seriesKey, field string) []byte {
return b
}
// AppendSeriesFieldKeyBytes combines seriesKey and field such
// that can be used to search a TSM index. The value is appended to dst and
// the extended buffer returned.
func AppendSeriesFieldKeyBytes(dst, seriesKey, field []byte) []byte {
dst = append(dst, seriesKey...)
dst = append(dst, KeyFieldSeparatorBytes...)
return append(dst, field...)
}
var (
blockToFieldType = [8]influxql.DataType{
BlockFloat64: influxql.Float,

256
tsdb/tsm1/engine_schema.go Normal file
View File

@ -0,0 +1,256 @@
package tsm1
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxql"
)
// TagValues returns an iterator which enumerates the values for the specific
// tagKey in the given bucket matching the predicate within the
// time range (start, end].
func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
encoded := tsdb.EncodeName(orgID, bucketID)
var tagKeyBytes []byte
if tagKey == "_measurement" {
tagKeyBytes = models.MeasurementTagKeyBytes
} else if tagKey == "_field" {
tagKeyBytes = models.FieldKeyTagKeyBytes
} else {
tagKeyBytes = []byte(tagKey)
}
if predicate == nil {
return e.tagValuesNoPredicate(ctx, encoded[:], tagKeyBytes, start, end)
}
return e.tagValuesPredicate(ctx, encoded[:], tagKeyBytes, start, end, predicate)
}
func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgBucket, tagKeyBytes []byte, start, end int64) (cursors.StringIterator, error) {
tsmValues := make(map[string]struct{})
var tags models.Tags
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := models.EscapeMeasurement(orgBucket)
// TODO(sgc): extend prefix when filtering by \x00 == <measurement>
e.FileStore.ForEachFile(func(f TSMFile) bool {
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(prefix, prefix) {
// TODO(sgc): create f.TimeRangeIterator(minKey, maxKey, start, end)
iter := f.TimeRangeIterator(prefix, start, end)
for i := 0; iter.Next(); i++ {
sfkey := iter.Key()
if !bytes.HasPrefix(sfkey, prefix) {
// end of org+bucket
break
}
key, _ := SeriesAndFieldFromCompositeKey(sfkey)
_, tags = models.ParseKeyBytesWithTags(key, tags[:0])
curVal := tags.Get(tagKeyBytes)
if len(curVal) == 0 {
continue
}
if _, ok := tsmValues[string(curVal)]; ok {
continue
}
if iter.HasData() {
tsmValues[string(curVal)] = struct{}{}
}
}
}
return true
})
_ = e.Cache.ApplyEntryFn(func(sfkey []byte, entry *entry) error {
if !bytes.HasPrefix(sfkey, prefix) {
return nil
}
key, _ := SeriesAndFieldFromCompositeKey(sfkey)
_, tags = models.ParseKeyBytesWithTags(key, tags[:0])
curVal := tags.Get(tagKeyBytes)
if len(curVal) == 0 {
return nil
}
if _, ok := tsmValues[string(curVal)]; ok {
return nil
}
if entry.values.Contains(start, end) {
tsmValues[string(curVal)] = struct{}{}
}
return nil
})
vals := make([]string, 0, len(tsmValues))
for val := range tsmValues {
vals = append(vals, val)
}
sort.Strings(vals)
return cursors.NewStringSliceIterator(vals), nil
}
func (e *Engine) tagValuesPredicate(ctx context.Context, orgBucket, tagKeyBytes []byte, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
if err := ValidateTagPredicate(predicate); err != nil {
return nil, err
}
keys, err := e.findCandidateKeys(ctx, orgBucket, predicate)
if err != nil {
return nil, err
}
if len(keys) == 0 {
return nil, nil
}
var files []TSMFile
defer func() {
for _, f := range files {
f.Unref()
}
}()
var iters []*TimeRangeIterator
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := models.EscapeMeasurement(orgBucket)
e.FileStore.ForEachFile(func(f TSMFile) bool {
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(prefix, prefix) {
f.Ref()
files = append(files, f)
iters = append(iters, f.TimeRangeIterator(prefix, start, end))
}
return true
})
tsmValues := make(map[string]struct{})
// reusable buffers
var (
tags models.Tags
keybuf []byte
sfkey []byte
)
for i := range keys {
_, tags = tsdb.ParseSeriesKeyInto(keys[i], tags[:0])
curVal := tags.Get(tagKeyBytes)
if len(curVal) == 0 {
continue
}
if _, ok := tsmValues[string(curVal)]; ok {
continue
}
keybuf = models.AppendMakeKey(keybuf[:0], prefix, tags)
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
if e.Cache.Values(sfkey).Contains(start, end) {
tsmValues[string(curVal)] = struct{}{}
continue
}
for _, iter := range iters {
if exact, _ := iter.Seek(sfkey); !exact {
continue
}
if iter.HasData() {
tsmValues[string(curVal)] = struct{}{}
break
}
}
}
vals := make([]string, 0, len(tsmValues))
for val := range tsmValues {
vals = append(vals, val)
}
sort.Strings(vals)
return cursors.NewStringSliceIterator(vals), nil
}
func (e *Engine) findCandidateKeys(ctx context.Context, orgBucket []byte, predicate influxql.Expr) ([][]byte, error) {
// determine candidate series keys
sitr, err := e.index.MeasurementSeriesByExprIterator(orgBucket, predicate)
if err != nil {
return nil, err
} else if sitr == nil {
return nil, nil
}
defer sitr.Close()
var keys [][]byte
for {
elem, err := sitr.Next()
if err != nil {
return nil, err
} else if elem.SeriesID.IsZero() {
break
}
key := e.sfile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}
keys = append(keys, key)
}
return keys, nil
}
var errUnexpectedTagComparisonOperator = errors.New("unexpected tag comparison operator")
func ValidateTagPredicate(expr influxql.Expr) (err error) {
influxql.WalkFunc(expr, func(node influxql.Node) {
if err != nil {
return
}
switch n := node.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, influxql.OR, influxql.AND:
default:
err = errUnexpectedTagComparisonOperator
}
switch r := n.LHS.(type) {
case *influxql.VarRef:
case *influxql.BinaryExpr:
default:
err = fmt.Errorf("binary expression: LHS must be tag key reference, got: %T", r)
}
switch r := n.RHS.(type) {
case *influxql.StringLiteral:
case *influxql.RegexLiteral:
case *influxql.BinaryExpr:
default:
err = fmt.Errorf("binary expression: RHS must be string or regex, got: %T", r)
}
}
})
return err
}

View File

@ -0,0 +1,281 @@
package tsm1_test
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/influxdata/influxql"
)
func TestEngine_TagValues(t *testing.T) {
e, err := NewEngine()
if err != nil {
t.Fatal(err)
}
if err := e.Open(context.Background()); err != nil {
t.Fatal(err)
}
defer e.Close()
orgs := []struct {
org, bucket influxdb.ID
}{
{
org: 0x5020,
bucket: 0x5100,
},
{
org: 0x6000,
bucket: 0x6100,
},
}
// this org will require escaping the 0x20 byte
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
cpuA,host=0A,os=linux value=1.1 101
cpuA,host=AA,os=linux value=1.2 102
cpuA,host=AA,os=linux value=1.3 104
cpuA,host=CA,os=linux value=1.3 104
cpuA,host=CA,os=linux value=1.3 105
cpuA,host=DA,os=macOS value=1.3 106
memA,host=DA,os=macOS value=1.3 101`)
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
cpuB,host=0B,os=linux value=1.1 101
cpuB,host=AB,os=linux value=1.2 102
cpuB,host=AB,os=linux value=1.3 104
cpuB,host=CB,os=linux value=1.3 104
cpuB,host=CB,os=linux value=1.3 105
cpuB,host=DB,os=macOS value=1.3 106
memB,host=DB,os=macOS value=1.3 101`)
// send some points to TSM data
e.MustWriteSnapshot()
// delete some data from the first bucket
e.MustDeleteBucketRange(orgs[0].org, orgs[0].bucket, 0, 105)
// leave some points in the cache
e.MustWritePointsString(orgs[0].org, orgs[0].bucket, `
cpuA,host=0A,os=linux value=1.1 201
cpuA,host=AA,os=linux value=1.2 202
cpuA,host=AA,os=linux value=1.3 204
cpuA,host=BA,os=macOS value=1.3 204
cpuA,host=BA,os=macOS value=1.3 205
cpuA,host=EA,os=linux value=1.3 206
memA,host=EA,os=linux value=1.3 201`)
e.MustWritePointsString(orgs[1].org, orgs[1].bucket, `
cpuB,host=0B,os=linux value=1.1 201
cpuB,host=AB,os=linux value=1.2 202
cpuB,host=AB,os=linux value=1.3 204
cpuB,host=BB,os=linux value=1.3 204
cpuB,host=BB,os=linux value=1.3 205
cpuB,host=EB,os=macOS value=1.3 206
memB,host=EB,os=macOS value=1.3 201`)
type args struct {
org int
key string
min, max int64
expr string
}
var tests = []struct {
name string
args args
exp []string
}{
// ***********************
// * queries for the first org, which has some deleted data
// ***********************
// host tag
{
name: "TSM and cache",
args: args{
org: 0,
key: "host",
min: 0,
max: 300,
},
exp: []string{"0A", "AA", "BA", "DA", "EA"},
},
{
name: "only TSM",
args: args{
org: 0,
key: "host",
min: 0,
max: 199,
},
exp: []string{"DA"},
},
{
name: "only cache",
args: args{
org: 0,
key: "host",
min: 200,
max: 299,
},
exp: []string{"0A", "AA", "BA", "EA"},
},
{
name: "one timestamp TSM/data",
args: args{
org: 0,
key: "host",
min: 106,
max: 106,
},
exp: []string{"DA"},
},
{
name: "one timestamp cache/data",
args: args{
org: 0,
key: "host",
min: 201,
max: 201,
},
exp: []string{"0A", "EA"},
},
{
name: "one timestamp TSM/nodata",
args: args{
org: 0,
key: "host",
min: 103,
max: 103,
},
exp: nil,
},
{
name: "one timestamp cache/nodata",
args: args{
org: 0,
key: "host",
min: 203,
max: 203,
},
exp: nil,
},
// _measurement tag
{
name: "_measurement/all",
args: args{
org: 0,
key: "_measurement",
min: 0,
max: 399,
},
exp: []string{"cpuA", "memA"},
},
{
name: "_measurement/some",
args: args{
org: 0,
key: "_measurement",
min: 205,
max: 399,
},
exp: []string{"cpuA"},
},
// queries with predicates
{
name: "predicate/macOS",
args: args{
org: 0,
key: "host",
min: 0,
max: 300,
expr: `os = 'macOS'`,
},
exp: []string{"BA", "DA"},
},
{
name: "predicate/linux",
args: args{
org: 0,
key: "host",
min: 0,
max: 300,
expr: `os = 'linux'`,
},
exp: []string{"0A", "AA", "EA"},
},
// ***********************
// * queries for the second org, which has no deleted data
// ***********************
{
args: args{
org: 1,
key: "host",
min: 0,
max: 1000,
},
exp: []string{"0B", "AB", "BB", "CB", "DB", "EB"},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
a := tc.args
var expr influxql.Expr
if len(a.expr) > 0 {
expr = influxql.MustParseExpr(a.expr)
}
iter, err := e.TagValues(context.Background(), orgs[a.org].org, orgs[a.org].bucket, a.key, a.min, a.max, expr)
if err != nil {
t.Fatalf("TagValues: error %v", err)
}
got := cursors.StringIteratorToSlice(iter)
if !cmp.Equal(got, tc.exp) {
t.Errorf("unexpected TagValues: -got/+exp\n%v", cmp.Diff(got, tc.exp))
}
})
}
}
func TestValidateTagPredicate(t *testing.T) {
tests := []struct {
name string
expr string
wantErr bool
}{
{
expr: `"_m" = 'foo'`,
wantErr: false,
},
{
expr: `_m = 'foo'`,
wantErr: false,
},
{
expr: `_m = foo`,
wantErr: true,
},
{
expr: `_m = 5`,
wantErr: true,
},
{
expr: `_m =~ //`,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tsm1.ValidateTagPredicate(influxql.MustParseExpr(tt.expr)); (err != nil) != tt.wantErr {
t.Errorf("ValidateTagPredicate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View File

@ -13,6 +13,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
@ -468,6 +469,29 @@ func (e *Engine) MustWriteSnapshot() {
}
}
// MustWritePointsString parses and writes the specified points to the
// provided org and bucket. Panic on error.
func (e *Engine) MustWritePointsString(org, bucket influxdb.ID, buf string) {
err := e.writePoints(MustParseExplodePoints(org, bucket, buf)...)
if err != nil {
panic(err)
}
}
// MustDeleteBucketRange calls DeleteBucketRange using the org and bucket for
// the name. Panic on error.
func (e *Engine) MustDeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) {
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
encoded := tsdb.EncodeName(orgID, bucketID)
name := models.EscapeMeasurement(encoded[:])
err := e.DeleteBucketRange(name, min, max)
if err != nil {
panic(err)
}
}
func MustOpenIndex(path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile) *tsi1.Index {
idx := tsi1.NewIndex(sfile, tsi1.NewConfig(), tsi1.WithPath(path))
if err := idx.Open(context.Background()); err != nil {
@ -516,6 +540,17 @@ func MustParsePointsString(buf string) []models.Point {
return a
}
// MustParseExplodePoints parses points from a string and transforms using
// ExplodePoints using the provided org and bucket. Panic on error.
func MustParseExplodePoints(org, bucket influxdb.ID, buf string) []models.Point {
a := MustParsePointsString(buf)
a, err := tsdb.ExplodePoints(org, bucket, a)
if err != nil {
panic(err)
}
return a
}
// MustParsePointString parses the first point from a string. Panic on error.
func MustParsePointString(buf string) models.Point { return MustParsePointsString(buf)[0] }

View File

@ -79,6 +79,11 @@ type TSMFile interface {
// OverlapsKeyRange returns true if the key range of the file intersects min and max.
OverlapsKeyRange(min, max []byte) bool
// OverlapsKeyPrefixRange returns true if the key range of the file
// intersects min and max, evaluating up to the length of min and max
// of the key range.
OverlapsKeyPrefixRange(min, max []byte) bool
// TimeRange returns the min and max time across all keys in the file.
TimeRange() (int64, int64)
@ -427,6 +432,31 @@ func (f *FileStore) Delete(keys [][]byte) error {
return f.DeleteRange(keys, math.MinInt64, math.MaxInt64)
}
type unrefs []TSMFile
func (u *unrefs) Unref() {
for _, f := range *u {
f.Unref()
}
}
// ForEachFile calls fn for all TSM files or until fn returns false.
// fn is called on the same goroutine as the caller.
func (f *FileStore) ForEachFile(fn func(f TSMFile) bool) {
f.mu.RLock()
files := make(unrefs, 0, len(f.files))
defer files.Unref()
for _, f := range f.files {
f.Ref()
files = append(files, f)
if !fn(f) {
break
}
}
f.mu.RUnlock()
}
func (f *FileStore) Apply(fn func(r TSMFile) error) error {
// Limit apply fn to number of cores
limiter := limiter.NewFixed(runtime.GOMAXPROCS(0))

View File

@ -358,6 +358,13 @@ func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool {
return t.index.OverlapsKeyRange(min, max)
}
// OverlapsKeyPrefixRange returns true if the key range of the file
// intersects min and max, evaluating up to the length of min and max
// of the key range.
func (t *TSMReader) OverlapsKeyPrefixRange(min, max []byte) bool {
return t.index.OverlapsKeyPrefixRange(min, max)
}
// TimeRange returns the min and max time across all keys in the file.
func (t *TSMReader) TimeRange() (int64, int64) {
return t.index.TimeRange()

View File

@ -59,6 +59,11 @@ type TSMIndex interface {
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
OverlapsKeyRange(min, max []byte) bool
// OverlapsKeyPrefixRange returns true if the key range of the file
// intersects min and max, evaluating up to the length of min and max
// of the key range.
OverlapsKeyPrefixRange(min, max []byte) bool
// Size returns the size of the current index in bytes.
Size() uint32
@ -625,6 +630,20 @@ func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool {
return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0
}
// OverlapsKeyPrefixRange returns true if the key range of the file
// intersects min and max, evaluating up to the length of min and max
// of the key range.
func (d *indirectIndex) OverlapsKeyPrefixRange(min, max []byte) bool {
minKey, maxKey := d.minKey, d.maxKey
if len(maxKey) > len(min) {
maxKey = maxKey[:len(min)]
}
if len(minKey) > len(max) {
minKey = minKey[:len(max)]
}
return bytes.Compare(minKey, max) <= 0 && bytes.Compare(maxKey, min) >= 0
}
// KeyRange returns the min and max keys in the index.
func (d *indirectIndex) KeyRange() ([]byte, []byte) {
return d.minKey, d.maxKey