influxdb/tsdb/tsm1/engine_schema.go

528 lines
14 KiB
Go

package tsm1
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"strings"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxql"
)
// cancelCheckInterval represents the period at which TagKeys and TagValues
// will check for a canceled context. Specifically after every 64 series
// scanned, the query context will be checked for cancellation, and if canceled,
// the calls will immediately return.
const cancelCheckInterval = 64
// TagValues returns an iterator which enumerates the values for the specific
// tagKey in the given bucket matching the predicate within the
// time range (start, end].
//
// TagValues will always return a StringIterator if there is no error.
//
// If the context is canceled before TagValues has finished processing, a non-nil
// error will be returned along with a partial result of the already scanned values.
func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
encoded := tsdb.EncodeName(orgID, bucketID)
if predicate == nil {
return e.tagValuesNoPredicate(ctx, encoded[:], []byte(tagKey), start, end)
}
return e.tagValuesPredicate(ctx, encoded[:], []byte(tagKey), start, end, predicate)
}
func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgBucket, tagKeyBytes []byte, start, end int64) (cursors.StringIterator, error) {
tsmValues := make(map[string]struct{})
var tags models.Tags
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := models.EscapeMeasurement(orgBucket)
// TODO(sgc): extend prefix when filtering by \x00 == <measurement>
var stats cursors.CursorStats
var canceled bool
e.FileStore.ForEachFile(func(f TSMFile) bool {
// Check the context before accessing each tsm file
select {
case <-ctx.Done():
canceled = true
return false
default:
}
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(prefix, prefix) {
// TODO(sgc): create f.TimeRangeIterator(minKey, maxKey, start, end)
iter := f.TimeRangeIterator(prefix, start, end)
for i := 0; iter.Next(); i++ {
sfkey := iter.Key()
if !bytes.HasPrefix(sfkey, prefix) {
// end of org+bucket
break
}
key, _ := SeriesAndFieldFromCompositeKey(sfkey)
tags = models.ParseTagsWithTags(key, tags[:0])
curVal := tags.Get(tagKeyBytes)
if len(curVal) == 0 {
continue
}
if _, ok := tsmValues[string(curVal)]; ok {
continue
}
if iter.HasData() {
tsmValues[string(curVal)] = struct{}{}
}
}
stats.Add(iter.Stats())
}
return true
})
if canceled {
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
}
// With performance in mind, we explicitly do not check the context
// while scanning the entries in the cache.
prefixStr := string(prefix)
_ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error {
if !strings.HasPrefix(sfkey, prefixStr) {
return nil
}
// TODO(edd): consider the []byte() conversion here.
key, _ := SeriesAndFieldFromCompositeKey([]byte(sfkey))
tags = models.ParseTagsWithTags(key, tags[:0])
curVal := tags.Get(tagKeyBytes)
if len(curVal) == 0 {
return nil
}
if _, ok := tsmValues[string(curVal)]; ok {
return nil
}
stats.ScannedValues += entry.values.Len()
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
if entry.values.Contains(start, end) {
tsmValues[string(curVal)] = struct{}{}
}
return nil
})
vals := make([]string, 0, len(tsmValues))
for val := range tsmValues {
vals = append(vals, val)
}
sort.Strings(vals)
return cursors.NewStringSliceIteratorWithStats(vals, stats), nil
}
func (e *Engine) tagValuesPredicate(ctx context.Context, orgBucket, tagKeyBytes []byte, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
if err := ValidateTagPredicate(predicate); err != nil {
return nil, err
}
keys, err := e.findCandidateKeys(ctx, orgBucket, predicate)
if err != nil {
return cursors.EmptyStringIterator, err
}
if len(keys) == 0 {
return cursors.EmptyStringIterator, nil
}
var files []TSMFile
defer func() {
for _, f := range files {
f.Unref()
}
}()
var iters []*TimeRangeIterator
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := models.EscapeMeasurement(orgBucket)
var canceled bool
e.FileStore.ForEachFile(func(f TSMFile) bool {
// Check the context before accessing each tsm file
select {
case <-ctx.Done():
canceled = true
return false
default:
}
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(prefix, prefix) {
f.Ref()
files = append(files, f)
iters = append(iters, f.TimeRangeIterator(prefix, start, end))
}
return true
})
var stats cursors.CursorStats
if canceled {
stats = statsFromIters(stats, iters)
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
}
tsmValues := make(map[string]struct{})
// reusable buffers
var (
tags models.Tags
keybuf []byte
sfkey []byte
)
for i := range keys {
// to keep cache scans fast, check context every 'cancelCheckInterval' iteratons
if i%cancelCheckInterval == 0 {
select {
case <-ctx.Done():
stats = statsFromIters(stats, iters)
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
default:
}
}
_, tags = tsdb.ParseSeriesKeyInto(keys[i], tags[:0])
curVal := tags.Get(tagKeyBytes)
if len(curVal) == 0 {
continue
}
if _, ok := tsmValues[string(curVal)]; ok {
continue
}
keybuf = models.AppendMakeKey(keybuf[:0], prefix, tags)
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
values := e.Cache.Values(sfkey)
stats.ScannedValues += values.Len()
stats.ScannedBytes += values.Len() * 8 // sizeof timestamp
if values.Contains(start, end) {
tsmValues[string(curVal)] = struct{}{}
continue
}
for _, iter := range iters {
if exact, _ := iter.Seek(sfkey); !exact {
continue
}
if iter.HasData() {
tsmValues[string(curVal)] = struct{}{}
break
}
}
}
vals := make([]string, 0, len(tsmValues))
for val := range tsmValues {
vals = append(vals, val)
}
sort.Strings(vals)
stats = statsFromIters(stats, iters)
return cursors.NewStringSliceIteratorWithStats(vals, stats), err
}
func (e *Engine) findCandidateKeys(ctx context.Context, orgBucket []byte, predicate influxql.Expr) ([][]byte, error) {
// determine candidate series keys
sitr, err := e.index.MeasurementSeriesByExprIterator(orgBucket, predicate)
if err != nil {
return nil, err
} else if sitr == nil {
return nil, nil
}
defer sitr.Close()
var keys [][]byte
for i := 0; ; i++ {
// to keep series file index scans fast,
// check context every 'cancelCheckInterval' iteratons
if i%cancelCheckInterval == 0 {
select {
case <-ctx.Done():
return keys, ctx.Err()
default:
}
}
elem, err := sitr.Next()
if err != nil {
return nil, err
} else if elem.SeriesID.IsZero() {
break
}
key := e.sfile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}
keys = append(keys, key)
}
return keys, nil
}
// TagKeys returns an iterator which enumerates the tag keys for the given
// bucket matching the predicate within the time range (start, end].
//
// TagKeys will always return a StringIterator if there is no error.
//
// If the context is canceled before TagKeys has finished processing, a non-nil
// error will be returned along with a partial result of the already scanned keys.
func (e *Engine) TagKeys(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
encoded := tsdb.EncodeName(orgID, bucketID)
if predicate == nil {
return e.tagKeysNoPredicate(ctx, encoded[:], start, end)
}
return e.tagKeysPredicate(ctx, encoded[:], start, end, predicate)
}
func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgBucket []byte, start, end int64) (cursors.StringIterator, error) {
var tags models.Tags
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := models.EscapeMeasurement(orgBucket)
var keyset models.TagKeysSet
// TODO(sgc): extend prefix when filtering by \x00 == <measurement>
var stats cursors.CursorStats
var canceled bool
e.FileStore.ForEachFile(func(f TSMFile) bool {
// Check the context before touching each tsm file
select {
case <-ctx.Done():
canceled = true
return false
default:
}
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(prefix, prefix) {
// TODO(sgc): create f.TimeRangeIterator(minKey, maxKey, start, end)
iter := f.TimeRangeIterator(prefix, start, end)
for i := 0; iter.Next(); i++ {
sfkey := iter.Key()
if !bytes.HasPrefix(sfkey, prefix) {
// end of org+bucket
break
}
key, _ := SeriesAndFieldFromCompositeKey(sfkey)
tags = models.ParseTagsWithTags(key, tags[:0])
if keyset.IsSupersetKeys(tags) {
continue
}
if iter.HasData() {
keyset.UnionKeys(tags)
}
}
stats.Add(iter.Stats())
}
return true
})
if canceled {
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
}
// With performance in mind, we explicitly do not check the context
// while scanning the entries in the cache.
_ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error {
if !strings.HasPrefix(sfkey, string(prefix)) {
return nil
}
// TODO(edd): consider []byte conversion here.
key, _ := SeriesAndFieldFromCompositeKey([]byte(sfkey))
tags = models.ParseTagsWithTags(key, tags[:0])
if keyset.IsSupersetKeys(tags) {
return nil
}
stats.ScannedValues += entry.values.Len()
stats.ScannedBytes += entry.values.Len() * 8 // sizeof timestamp
if entry.values.Contains(start, end) {
keyset.UnionKeys(tags)
}
return nil
})
return cursors.NewStringSliceIteratorWithStats(keyset.Keys(), stats), nil
}
func (e *Engine) tagKeysPredicate(ctx context.Context, orgBucket []byte, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
if err := ValidateTagPredicate(predicate); err != nil {
return nil, err
}
keys, err := e.findCandidateKeys(ctx, orgBucket, predicate)
if err != nil {
return cursors.EmptyStringIterator, err
}
if len(keys) == 0 {
return cursors.EmptyStringIterator, nil
}
var files []TSMFile
defer func() {
for _, f := range files {
f.Unref()
}
}()
var iters []*TimeRangeIterator
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := models.EscapeMeasurement(orgBucket)
var canceled bool
e.FileStore.ForEachFile(func(f TSMFile) bool {
// Check the context before touching each tsm file
select {
case <-ctx.Done():
canceled = true
return false
default:
}
if f.OverlapsTimeRange(start, end) && f.OverlapsKeyPrefixRange(prefix, prefix) {
f.Ref()
files = append(files, f)
iters = append(iters, f.TimeRangeIterator(prefix, start, end))
}
return true
})
var stats cursors.CursorStats
if canceled {
stats = statsFromIters(stats, iters)
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
}
var keyset models.TagKeysSet
// reusable buffers
var (
tags models.Tags
keybuf []byte
sfkey []byte
)
for i := range keys {
// to keep cache scans fast, check context every 'cancelCheckInterval' iteratons
if i%cancelCheckInterval == 0 {
select {
case <-ctx.Done():
stats = statsFromIters(stats, iters)
return cursors.NewStringSliceIteratorWithStats(nil, stats), ctx.Err()
default:
}
}
_, tags = tsdb.ParseSeriesKeyInto(keys[i], tags[:0])
if keyset.IsSupersetKeys(tags) {
continue
}
keybuf = models.AppendMakeKey(keybuf[:0], prefix, tags)
sfkey = AppendSeriesFieldKeyBytes(sfkey[:0], keybuf, tags.Get(models.FieldKeyTagKeyBytes))
values := e.Cache.Values(sfkey)
stats.ScannedValues += values.Len()
stats.ScannedBytes += values.Len() * 8 // sizeof timestamp
if values.Contains(start, end) {
keyset.UnionKeys(tags)
continue
}
for _, iter := range iters {
if exact, _ := iter.Seek(sfkey); !exact {
continue
}
if iter.HasData() {
keyset.UnionKeys(tags)
break
}
}
}
stats = statsFromIters(stats, iters)
return cursors.NewStringSliceIteratorWithStats(keyset.Keys(), stats), err
}
func statsFromIters(stats cursors.CursorStats, iters []*TimeRangeIterator) cursors.CursorStats {
for _, iter := range iters {
stats.Add(iter.Stats())
}
return stats
}
var errUnexpectedTagComparisonOperator = errors.New("unexpected tag comparison operator")
func ValidateTagPredicate(expr influxql.Expr) (err error) {
influxql.WalkFunc(expr, func(node influxql.Node) {
if err != nil {
return
}
switch n := node.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, influxql.OR, influxql.AND:
default:
err = errUnexpectedTagComparisonOperator
}
switch r := n.LHS.(type) {
case *influxql.VarRef:
case *influxql.BinaryExpr:
default:
err = fmt.Errorf("binary expression: LHS must be tag key reference, got: %T", r)
}
switch r := n.RHS.(type) {
case *influxql.StringLiteral:
case *influxql.RegexLiteral:
case *influxql.BinaryExpr:
default:
err = fmt.Errorf("binary expression: RHS must be string or regex, got: %T", r)
}
}
})
return err
}