fix(influxql): make meta queries respect query timeout (#21545)

Co-authored-by: davidby-influx <dbyrne@influxdata.com>
pull/21550/head
Daniel Moran 2021-05-24 21:10:53 -04:00 committed by GitHub
parent b1e1125376
commit 00420fb54c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 289 additions and 95 deletions

View File

@ -31,7 +31,7 @@ type Engine interface {
influxdb.BackupService
influxdb.RestoreService
SeriesCardinality(orgID, bucketID platform.ID) int64
SeriesCardinality(ctx context.Context, bucketID platform.ID) int64
TSDBStore() storage.TSDBStore
MetaClient() storage.MetaClient
@ -116,8 +116,8 @@ func (t *TemporaryEngine) WritePoints(ctx context.Context, orgID platform.ID, bu
}
// SeriesCardinality returns the number of series in the engine.
func (t *TemporaryEngine) SeriesCardinality(orgID, bucketID platform.ID) int64 {
return t.engine.SeriesCardinality(orgID, bucketID)
func (t *TemporaryEngine) SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 {
return t.engine.SeriesCardinality(ctx, bucketID)
}
// DeleteBucketRangePredicate will delete a bucket from the range and predicate.

View File

@ -128,7 +128,7 @@ func TestLauncher_BucketDelete(t *testing.T) {
// Verify the cardinality in the engine.
engine := l.Launcher.Engine()
if got, exp := engine.SeriesCardinality(l.Org.ID, l.Bucket.ID), int64(1); got != exp {
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(1); got != exp {
t.Fatalf("got %d, exp %d", got, exp)
}
@ -150,7 +150,7 @@ func TestLauncher_BucketDelete(t *testing.T) {
}
// Verify that the data has been removed from the storage engine.
if got, exp := engine.SeriesCardinality(l.Org.ID, l.Bucket.ID), int64(0); got != exp {
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(0); got != exp {
t.Fatalf("after bucket delete got %d, exp %d", got, exp)
}
}

View File

@ -1,6 +1,7 @@
package internal
import (
"context"
"io"
"time"
@ -30,7 +31,7 @@ type TSDBStoreMock struct {
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measurements int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
@ -43,8 +44,8 @@ type TSDBStoreMock struct {
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(shardID uint64, points []models.Point) error
}
@ -92,8 +93,8 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source
func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
return s.ImportShardFn(id, r)
}
func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(auth, database, cond)
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(ctx, auth, database, cond)
}
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measurements int, series int) {
return s.MeasurementSeriesCountsFn(database)
@ -137,11 +138,11 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard {
func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic {
return s.StatisticsFn(tags)
}
func (s *TSDBStoreMock) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(auth, shardIDs, cond)
func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(ctx, auth, shardIDs, cond)
}
func (s *TSDBStoreMock) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(auth, shardIDs, cond)
func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(ctx, auth, shardIDs, cond)
}
func (s *TSDBStoreMock) WithLogger(log *zap.Logger) {
s.WithLoggerFn(log)

View File

@ -90,11 +90,11 @@ type MetaClient interface {
type TSDBStore interface {
DeleteMeasurement(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
ShardGroup(ids []uint64) tsdb.ShardGroup
Shards(ids []uint64) []*tsdb.Shard
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
}
// NewEngine initialises a new storage engine, including a series file, index and
@ -463,14 +463,14 @@ func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader)
}
// SeriesCardinality returns the number of series in the engine.
func (e *Engine) SeriesCardinality(orgID, bucketID platform.ID) int64 {
func (e *Engine) SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return 0
}
n, err := e.tsdbStore.SeriesCardinality(bucketID.String())
n, err := e.tsdbStore.SeriesCardinality(ctx, bucketID.String())
if err != nil {
return 0
}

View File

@ -139,13 +139,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, len(databases))
for _, database := range databases {
log := s.Logger.With(logger.Database(database))
sc, err := s.SeriesCardinality(database)
sc, err := s.SeriesCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve series cardinality", zap.Error(err))
continue
}
mc, err := s.MeasurementsCardinality(database)
mc, err := s.MeasurementsCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve measurement cardinality", zap.Error(err))
continue
@ -1049,7 +1049,7 @@ func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (est
// Cardinality is calculated exactly by unioning all shards' bitsets of series
// IDs. The result of this method cannot be combined with any other results.
//
func (s *Store) SeriesCardinality(database string) (int64, error) {
func (s *Store) SeriesCardinality(ctx context.Context, database string) (int64, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
@ -1057,7 +1057,12 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
var setMu sync.Mutex
others := make([]*SeriesIDSet, 0, len(shards))
s.walkShards(shards, func(sh *Shard) error {
err := s.walkShards(shards, func(sh *Shard) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
index, err := sh.Index()
if err != nil {
return err
@ -1070,9 +1075,17 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
return nil
})
if err != nil {
return 0, err
}
ss := NewSeriesIDSet()
ss.Merge(others...)
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
return int64(ss.Cardinality()), nil
}
@ -1081,8 +1094,13 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
func (s *Store) SeriesSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
@ -1095,13 +1113,8 @@ func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Ske
//
// Cardinality is calculated using a sketch-based estimation. The result of this
// method cannot be combined with any other results.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
})
func (s *Store) MeasurementsCardinality(ctx context.Context, database string) (int64, error) {
ss, ts, err := s.MeasurementsSketches(ctx, database)
if err != nil {
return 0, err
@ -1114,8 +1127,14 @@ func (s *Store) MeasurementsCardinality(database string) (int64, error) {
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
func (s *Store) MeasurementsSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
// every iteration, check for timeout.
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
@ -1430,7 +1449,7 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
// MeasurementNames returns a slice of all measurements. Measurements accepts an
// optional condition expression. If cond is nil, then all measurements for the
// database will be returned.
func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
@ -1449,6 +1468,11 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in
}
is.Indexes = append(is.Indexes, index)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return is.MeasurementNamesByExpr(auth, cond)
}
@ -1471,7 +1495,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
// TagKeys returns the tag keys in the given database, matching the condition.
func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
if len(shardIDs) == 0 {
return nil, nil
}
@ -1543,6 +1567,13 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
var results []TagKeys
for _, name := range names {
// Check for timeouts.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Build keyset over all indexes for measurement.
tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil)
if err != nil {
@ -1556,6 +1587,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
// If they have authorized series associated with them.
if filterExpr == nil {
for tagKey := range tagKeySet {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey))
if err != nil {
return nil, err
@ -1636,7 +1673,7 @@ func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[
// TagValues returns the tag keys and values for the provided shards, where the
// tag values satisfy the provided condition.
func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
if cond == nil {
return nil, errors.New("a condition is required")
}
@ -1724,6 +1761,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
// values from matching series. Series may be filtered using a WHERE
// filter.
for _, name := range names {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Determine a list of keys from condition.
keySet, err := is.MeasurementTagKeysByExpr(name, cond)
if err != nil {
@ -1786,6 +1830,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
// instances of tagValues for a given measurement.
idxBuf := make([][2]int, 0, len(is.Indexes))
for i < len(allResults) {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Gather all occurrences of the same measurement for merging.
for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) {
j++

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -243,7 +244,7 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
t.Fatal(err)
}
measurements, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil)
measurements, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
if err != nil {
t.Fatal(err)
}
@ -302,7 +303,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
wg.Wait()
keys, err := s.TagKeys(nil, []uint64{1, 2}, nil)
keys, err := s.TagKeys(context.Background(), nil, []uint64{1, 2}, nil)
if err != nil {
t.Fatal(err)
}
@ -400,7 +401,7 @@ func TestStore_DeleteShard(t *testing.T) {
// cpu,serverb=b should be removed from the series file for db0 because
// shard 1 was the only owner of that series.
// Verify by getting all tag keys.
keys, err := s.TagKeys(nil, []uint64{2}, nil)
keys, err := s.TagKeys(context.Background(), nil, []uint64{2}, nil)
if err != nil {
return err
}
@ -415,7 +416,7 @@ func TestStore_DeleteShard(t *testing.T) {
// Verify that the same series was not removed from other databases'
// series files.
if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil {
if keys, err = s.TagKeys(context.Background(), nil, []uint64{3}, nil); err != nil {
return err
}
@ -801,7 +802,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
`cpu value=3 20`,
)
meas, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil)
meas, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
if err != nil {
t.Fatalf("unexpected error with MeasurementNames: %v", err)
}
@ -842,7 +843,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
}
// Delete all the series for each measurement.
mnames, err := store.MeasurementNames(nil, "db", nil)
mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil)
if err != nil {
t.Fatal(err)
}
@ -854,7 +855,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -866,7 +867,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
// Since all the series have been deleted, all the measurements should have
// been removed from the index too.
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
t.Fatal(err)
}
@ -919,7 +920,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -930,7 +931,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) {
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
t.Fatal(err)
}
@ -999,7 +1000,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -1010,7 +1011,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) {
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
t.Fatal(err)
}
@ -1041,6 +1042,149 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
}
}
func TestStore_MetaQuery_Timeout(t *testing.T) {
if testing.Short() || os.Getenv("APPVEYOR") != "" {
t.Skip("Skipping test in short and appveyor mode.")
}
test := func(t *testing.T, index string) {
store := NewStore(t, index)
require.NoError(t, store.Open())
defer store.Close()
testStoreMetaQueryTimeout(t, store, index)
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
test(t, index)
})
}
}
func testStoreMetaQueryTimeout(t *testing.T, store *Store, index string) {
shards := testStoreMetaQuerySetup(t, store)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "SeriesCardinality"
_, err := store.Store.SeriesCardinality(ctx, "db")
return funcName, err
}, index)(t)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "MeasurementsCardinality"
_, err := store.Store.MeasurementsCardinality(ctx, "db")
return funcName, err
}, index)(t)
keyCondition, allCondition := testStoreMetaQueryCondition()
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "TagValues"
_, err := store.Store.TagValues(ctx, nil, shards, allCondition)
return funcName, err
}, index)(t)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "TagKeys"
_, err := store.Store.TagKeys(ctx, nil, shards, keyCondition)
return funcName, err
}, index)(t)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "MeasurementNames"
_, err := store.Store.MeasurementNames(ctx, nil, "db", nil)
return funcName, err
}, index)(t)
}
func testStoreMetaQueryCondition() (influxql.Expr, influxql.Expr) {
keyCondition := &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.OR,
LHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "tagKey4"},
},
RHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "tagKey5"},
},
},
}
whereCondition := &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "tagKey1"},
RHS: &influxql.StringLiteral{Val: "tagValue2"},
},
},
RHS: keyCondition,
},
}
allCondition := &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.EQREGEX,
LHS: &influxql.VarRef{Val: "tagKey3"},
RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`tagValue\d`)},
},
},
RHS: whereCondition,
}
return keyCondition, allCondition
}
func testStoreMetaQuerySetup(t *testing.T, store *Store) []uint64 {
const measurementCnt = 64
const tagCnt = 5
const valueCnt = 5
const pointsPerShard = 20000
// Generate point data to write to the shards.
series := genTestSeries(measurementCnt, tagCnt, valueCnt)
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
// shards such that we never write the same series to multiple shards.
shards := make([]uint64, len(points)/pointsPerShard)
for shardID := 0; shardID < len(points)/pointsPerShard; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
t.Fatalf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points[shardID*pointsPerShard:(shardID+1)*pointsPerShard]); err != nil {
t.Fatalf("batch write: %s", err)
}
shards[shardID] = uint64(shardID)
}
return shards
}
func testStoreMakeTimedFuncs(tested func(context.Context) (string, error), index string) func(*testing.T) {
cancelTested := func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(0))
defer cancel()
funcName, err := tested(ctx)
if err == nil {
t.Fatalf("%v: failed to time out with index type %v", funcName, index)
} else if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
t.Fatalf("%v: failed with %v instead of %v with index type %v", funcName, err, context.DeadlineExceeded, index)
}
}
return cancelTested
}
// Creates a large number of series in multiple shards, which will force
// compactions to occur.
func testStoreCardinalityCompactions(store *Store) error {
@ -1066,7 +1210,7 @@ func testStoreCardinalityCompactions(store *Store) error {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
return err
}
@ -1077,7 +1221,7 @@ func testStoreCardinalityCompactions(store *Store) error {
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
return err
}
@ -1116,7 +1260,7 @@ func TestStore_Sketches(t *testing.T) {
checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error {
// Get sketches and check cardinality...
sketch, tsketch, err := store.SeriesSketches("db")
sketch, tsketch, err := store.SeriesSketches(context.Background(), "db")
if err != nil {
return err
}
@ -1142,7 +1286,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Check measurement cardinality.
if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil {
if sketch, tsketch, err = store.MeasurementsSketches(context.Background(), "db"); err != nil {
return err
}
@ -1196,7 +1340,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Delete half the the measurements data
mnames, err := store.MeasurementNames(nil, "db", nil)
mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil)
if err != nil {
return err
}
@ -1322,9 +1466,8 @@ func TestStore_TagValues(t *testing.T) {
},
}
var s *Store
setup := func(t *testing.T, index string) []uint64 { // returns shard ids
s = MustOpenStore(t, index)
setup := func(t *testing.T, index string) (*Store, []uint64) { // returns shard ids
s := MustOpenStore(t, index)
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
cpu1%[1]d,host=nofoo value=1 %[4]d
@ -1349,14 +1492,15 @@ func TestStore_TagValues(t *testing.T) {
ids = append(ids, uint64(i))
s.MustCreateShardWithData("db0", "rp0", i, genPoints(i)...)
}
return ids
return s, ids
}
for _, example := range examples {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(example.Name+"_"+index, func(t *testing.T) {
shardIDs := setup(t, index)
got, err := s.TagValues(nil, shardIDs, example.Expr)
s, shardIDs := setup(t, index)
defer s.Close()
got, err := s.TagValues(context.Background(), nil, shardIDs, example.Expr)
if err != nil {
t.Fatal(err)
}
@ -1366,7 +1510,6 @@ func TestStore_TagValues(t *testing.T) {
t.Fatalf("got:\n%#v\n\nexp:\n%#v", got, exp)
}
})
s.Close()
}
}
}
@ -1397,7 +1540,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
},
}
names, err := s.MeasurementNames(authorizer, "db0", nil)
names, err := s.MeasurementNames(context.Background(), authorizer, "db0", nil)
if err != nil {
return err
}
@ -1427,7 +1570,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
return err
}
if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil {
if names, err = s.MeasurementNames(context.Background(), authorizer, "db0", nil); err != nil {
return err
}
@ -1483,7 +1626,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
},
}
keys, err := s.TagKeys(authorizer, []uint64{0}, nil)
keys, err := s.TagKeys(context.Background(), authorizer, []uint64{0}, nil)
if err != nil {
return err
}
@ -1518,7 +1661,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
return err
}
if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil {
if keys, err = s.TagKeys(context.Background(), authorizer, []uint64{0}, nil); err != nil {
return err
}
@ -1580,7 +1723,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
},
}
values, err := s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{
values, err := s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "host"},
@ -1620,7 +1763,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
return err
}
values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{
values, err = s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "host"},
@ -1741,7 +1884,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
errC <- nil
return
default:
names, err := s.MeasurementNames(nil, "db0", nil)
names, err := s.MeasurementNames(context.Background(), nil, "db0", nil)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}
@ -1826,7 +1969,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
errC <- nil
return
default:
keys, err := s.TagKeys(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
keys, err := s.TagKeys(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}
@ -1927,7 +2070,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
}
cond := rewrite.(*influxql.ShowTagValuesStatement).Condition
values, err := s.TagValues(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond)
values, err := s.TagValues(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}
@ -1989,7 +2132,7 @@ func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = store.SeriesCardinality("db")
_, _ = store.SeriesCardinality(context.Background(), "db")
}
})
store.Close()
@ -2071,8 +2214,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
{name: "s=10_m=100_v=1000", shards: 10, measurements: 100, tagValues: 1000},
}
var s *Store
setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids
setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
s := NewStore(b, index)
if err := s.Open(); err != nil {
panic(err)
@ -2108,13 +2250,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
shardIDs = append(shardIDs, uint64(i))
s.MustCreateShardWithData("db0", "rp0", i, genPoints(i, useRandom)...)
}
return shardIDs
}
teardown := func() {
if err := s.Close(); err != nil {
b.Fatal(err)
}
return s, shardIDs
}
// SHOW TAG VALUES WITH KEY IN ("host", "shard")
@ -2153,14 +2289,19 @@ func BenchmarkStore_TagValues(b *testing.B) {
for useRand := 0; useRand < 2; useRand++ {
for c, condition := range []influxql.Expr{cond1, cond2} {
for _, bm := range benchmarks {
shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1)
s, shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1)
teardown := func() {
if err := s.Close(); err != nil {
b.Fatal(err)
}
}
cnd := "Unfiltered"
if c == 0 {
cnd = "Filtered"
}
b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
if tvResult, err = s.TagValues(nil, shardIDs, condition); err != nil {
if tvResult, err = s.TagValues(context.Background(), nil, shardIDs, condition); err != nil {
b.Fatal(err)
}
}

View File

@ -396,7 +396,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(ctx context.Context
return err
}
names, err := e.TSDBStore.MeasurementNames(ectx.Authorizer, mapping.BucketID.String(), q.Condition)
names, err := e.TSDBStore.MeasurementNames(ctx, ectx.Authorizer, mapping.BucketID.String(), q.Condition)
if err != nil || len(names) == 0 {
return ectx.Send(ctx, &query.Result{
Err: err,
@ -510,7 +510,7 @@ func (e *StatementExecutor) executeShowTagKeys(ctx context.Context, q *influxql.
}
}
tagKeys, err := e.TSDBStore.TagKeys(ectx.Authorizer, shardIDs, cond)
tagKeys, err := e.TSDBStore.TagKeys(ctx, ectx.Authorizer, shardIDs, cond)
if err != nil {
return ectx.Send(ctx, &query.Result{
Err: err,
@ -602,7 +602,7 @@ func (e *StatementExecutor) executeShowTagValues(ctx context.Context, q *influxq
}
}
tagValues, err := e.TSDBStore.TagValues(ectx.Authorizer, shardIDs, cond)
tagValues, err := e.TSDBStore.TagValues(ctx, ectx.Authorizer, shardIDs, cond)
if err != nil {
return ectx.Send(ctx, &query.Result{Err: err})
}
@ -759,9 +759,9 @@ func (m mappings) DefaultRetentionPolicy(db string) string {
type TSDBStore interface {
DeleteMeasurement(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
}
var _ TSDBStore = LocalTSDBStore{}

View File

@ -5,12 +5,13 @@ import (
"context"
"errors"
"fmt"
"github.com/influxdata/influxdb/v2/kit/platform"
"reflect"
"regexp"
"testing"
"time"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/davecgh/go-spew/spew"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
@ -413,11 +414,11 @@ func NewQueryExecutor(t *testing.T, opts ...optFn) *QueryExecutor {
return nil
}
e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
e.TSDBStore.MeasurementNamesFn = func(_ context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return nil, nil
}
e.TSDBStore.TagValuesFn = func(_ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) {
e.TSDBStore.TagValuesFn = func(_ context.Context, _ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) {
return nil, nil
}

View File

@ -28,11 +28,11 @@ var (
)
type TSDBStore interface {
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
ShardGroup(ids []uint64) tsdb.ShardGroup
Shards(ids []uint64) []*tsdb.Shard
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
}
type MetaClient interface {
@ -336,7 +336,7 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
keys, err := s.TSDBStore.TagKeys(auth, shardIDs, expr)
keys, err := s.TSDBStore.TagKeys(ctx, auth, shardIDs, expr)
if err != nil {
return cursors.EmptyStringIterator, err
}
@ -459,7 +459,7 @@ func (s *Store) tagValues(ctx context.Context, mqAttrs *metaqueryAttributes, tag
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
values, err := s.TSDBStore.TagValues(auth, shardIDs, mqAttrs.pred)
values, err := s.TSDBStore.TagValues(ctx, auth, shardIDs, mqAttrs.pred)
if err != nil {
return nil, err
}
@ -491,7 +491,7 @@ func (s *Store) MeasurementNames(ctx context.Context, mqAttrs *metaqueryAttribut
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
values, err := s.TSDBStore.MeasurementNames(auth, mqAttrs.db, mqAttrs.pred)
values, err := s.TSDBStore.MeasurementNames(ctx, auth, mqAttrs.db, mqAttrs.pred)
if err != nil {
return nil, err
}