feat: Make meta queries respect QueryTimeout values

Meta queries (SHOW TAG VALUES, SHOW TAG KEYS, SHOW SERIES CARDINALITY, etc.) do not respect
the QueryTimeout config parameter. Meta queries should check the query context when possible
to allow cancellation and timeout. This will not be as frequent as regular queries, which
use iterators, because meta queries return data in batches.

Add a context.Context to
(*Store).MeasurementNames()
(*Store).MeasurementsCardinality()
(*Store).SeriesCardinality()
(*Store).TagValues()
(*Store).TagKeys()
(*Store).SeriesSketches()
(*Store).MeasurementsSketches()
which is tested for timeout or cancellation
to allow limitation of time spent in meta queries

https://github.com/influxdata/influxdb/issues/20736
pull/20802/head
davidby-influx 2021-01-29 15:56:29 -08:00 committed by davidby-influx
parent de1a0eb2a9
commit 092c7a9976
7 changed files with 299 additions and 104 deletions

View File

@ -1,6 +1,7 @@
package run
import (
"context"
"crypto/tls"
"fmt"
"io"
@ -540,14 +541,16 @@ func (s *Server) reportServer() {
for _, db := range dbs {
name := db.Name
n, err := s.TSDBStore.SeriesCardinality(name)
// Use the context.Background() to avoid timing out on this.
n, err := s.TSDBStore.SeriesCardinality(context.Background(), name)
if err != nil {
s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err))
} else {
numSeries += n
}
n, err = s.TSDBStore.MeasurementsCardinality(name)
// Use the context.Background() to avoid timing out on this.
n, err = s.TSDBStore.MeasurementsCardinality(context.Background(), name)
if err != nil {
s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err))
} else {

View File

@ -180,11 +180,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query
case *influxql.ShowMeasurementsStatement:
return e.executeShowMeasurementsStatement(stmt, ctx)
case *influxql.ShowMeasurementCardinalityStatement:
rows, err = e.executeShowMeasurementCardinalityStatement(stmt)
rows, err = e.executeShowMeasurementCardinalityStatement(stmt, ctx)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowSeriesCardinalityStatement:
rows, err = e.executeShowSeriesCardinalityStatement(stmt)
rows, err = e.executeShowSeriesCardinalityStatement(stmt, ctx)
case *influxql.ShowShardsStatement:
rows, err = e.executeShowShardsStatement(stmt)
case *influxql.ShowShardGroupsStatement:
@ -719,7 +719,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
return ErrDatabaseNameRequired
}
names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition)
names, err := e.TSDBStore.MeasurementNames(ctx.Context, ctx.Authorizer, q.Database, q.Condition)
if err != nil || len(names) == 0 {
return ctx.Send(&query.Result{
Err: err,
@ -758,12 +758,12 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
})
}
func (e *StatementExecutor) executeShowMeasurementCardinalityStatement(stmt *influxql.ShowMeasurementCardinalityStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowMeasurementCardinalityStatement(stmt *influxql.ShowMeasurementCardinalityStatement, ctx *query.ExecutionContext) (models.Rows, error) {
if stmt.Database == "" {
return nil, ErrDatabaseNameRequired
}
n, err := e.TSDBStore.MeasurementsCardinality(stmt.Database)
n, err := e.TSDBStore.MeasurementsCardinality(ctx.Context, stmt.Database)
if err != nil {
return nil, err
}
@ -829,12 +829,12 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards
return rows, nil
}
func (e *StatementExecutor) executeShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityStatement, ctx *query.ExecutionContext) (models.Rows, error) {
if stmt.Database == "" {
return nil, ErrDatabaseNameRequired
}
n, err := e.TSDBStore.SeriesCardinality(stmt.Database)
n, err := e.TSDBStore.SeriesCardinality(ctx.Context, stmt.Database)
if err != nil {
return nil, err
}
@ -966,7 +966,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
}
}
tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond)
tagKeys, err := e.TSDBStore.TagKeys(ctx.Context, ctx.Authorizer, shardIDs, cond)
if err != nil {
return ctx.Send(&query.Result{
Err: err,
@ -1053,7 +1053,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
}
}
tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond)
tagValues, err := e.TSDBStore.TagValues(ctx.Context, ctx.Authorizer, shardIDs, cond)
if err != nil {
return ctx.Send(&query.Result{Err: err})
}
@ -1374,12 +1374,12 @@ type TSDBStore interface {
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
SeriesCardinality(database string) (int64, error)
MeasurementsCardinality(database string) (int64, error)
SeriesCardinality(ctx context.Context, database string) (int64, error)
MeasurementsCardinality(ctx context.Context, database string) (int64, error)
}
var _ TSDBStore = LocalTSDBStore{}

View File

@ -1,6 +1,7 @@
package internal
import (
"context"
"io"
"time"
@ -92,13 +93,13 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source
func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
return s.ImportShardFn(id, r)
}
func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(auth, database, cond)
}
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) {
return s.MeasurementSeriesCountsFn(database)
}
func (s *TSDBStoreMock) MeasurementsCardinality(database string) (int64, error) {
func (s *TSDBStoreMock) MeasurementsCardinality(ctx context.Context, database string) (int64, error) {
return s.MeasurementsCardinalityFn(database)
}
func (s *TSDBStoreMock) Open() error {
@ -110,7 +111,7 @@ func (s *TSDBStoreMock) Path() string {
func (s *TSDBStoreMock) RestoreShard(id uint64, r io.Reader) error {
return s.RestoreShardFn(id, r)
}
func (s *TSDBStoreMock) SeriesCardinality(database string) (int64, error) {
func (s *TSDBStoreMock) SeriesCardinality(ctx context.Context, database string) (int64, error) {
return s.SeriesCardinalityFn(database)
}
func (s *TSDBStoreMock) SetShardEnabled(shardID uint64, enabled bool) error {
@ -137,10 +138,10 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard {
func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic {
return s.StatisticsFn(tags)
}
func (s *TSDBStoreMock) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(auth, shardIDs, cond)
}
func (s *TSDBStoreMock) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(auth, shardIDs, cond)
}
func (s *TSDBStoreMock) WithLogger(log *zap.Logger) {

View File

@ -230,7 +230,7 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
keys, err := s.TSDBStore.TagKeys(auth, shardIDs, expr)
keys, err := s.TSDBStore.TagKeys(ctx, auth, shardIDs, expr)
if err != nil {
return nil, err
}
@ -326,7 +326,7 @@ func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest)
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
values, err := s.TSDBStore.TagValues(auth, shardIDs, expr)
values, err := s.TSDBStore.TagValues(ctx, auth, shardIDs, expr)
if err != nil {
return nil, err
}
@ -385,7 +385,7 @@ func (s *Store) MeasurementNames(ctx context.Context, req *MeasurementNamesReque
// TODO(jsternberg): Use a real authorizer.
auth := query.OpenAuthorizer
values, err := s.TSDBStore.MeasurementNames(auth, database, expr)
values, err := s.TSDBStore.MeasurementNames(ctx, auth, database, expr)
if err != nil {
return nil, err
}

View File

@ -1,6 +1,7 @@
package tests
import (
"context"
"fmt"
"strings"
"sync"
@ -95,7 +96,7 @@ func TestConcurrentServer_TagValues(t *testing.T) {
ids = append(ids, si.ID)
}
}
srv.TSDBStore.TagValues(nil, ids, cond)
srv.TSDBStore.TagValues(context.Background(), nil, ids, cond)
}
var f3 = func() { s.DropDatabase("db0") }
@ -133,7 +134,7 @@ func TestConcurrentServer_ShowMeasurements(t *testing.T) {
if !ok {
t.Fatal("Not a local server")
}
srv.TSDBStore.MeasurementNames(query.OpenAuthorizer, "db0", nil)
srv.TSDBStore.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
}
runTest(10*time.Second, f1, f2)

View File

@ -2,6 +2,7 @@ package tsdb // import "github.com/influxdata/influxdb/tsdb"
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@ -147,19 +148,18 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
s.mu.RLock()
shards := s.shardsSlice()
s.mu.RUnlock()
// Add all the series and measurements cardinality estimations.
databases := s.Databases()
statistics := make([]models.Statistic, 0, len(databases))
for _, database := range databases {
log := s.Logger.With(logger.Database(database))
sc, err := s.SeriesCardinality(database)
sc, err := s.SeriesCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve series cardinality", zap.Error(err))
continue
}
mc, err := s.MeasurementsCardinality(database)
mc, err := s.MeasurementsCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve measurement cardinality", zap.Error(err))
continue
@ -1180,7 +1180,7 @@ func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (est
// Cardinality is calculated exactly by unioning all shards' bitsets of series
// IDs. The result of this method cannot be combined with any other results.
//
func (s *Store) SeriesCardinality(database string) (int64, error) {
func (s *Store) SeriesCardinality(ctx context.Context, database string) (int64, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
@ -1188,23 +1188,35 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
var setMu sync.Mutex
others := make([]*SeriesIDSet, 0, len(shards))
s.walkShards(shards, func(sh *Shard) error {
index, err := sh.Index()
if err != nil {
return err
err := s.walkShards(shards, func(sh *Shard) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
index, err := sh.Index()
if err != nil {
return err
}
seriesIDs := index.SeriesIDSet()
setMu.Lock()
others = append(others, seriesIDs)
setMu.Unlock()
return nil
}
seriesIDs := index.SeriesIDSet()
setMu.Lock()
others = append(others, seriesIDs)
setMu.Unlock()
return nil
})
if err != nil {
return 0, err
}
ss := NewSeriesIDSet()
ss.Merge(others...)
return int64(ss.Cardinality()), nil
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
return int64(ss.Cardinality()), nil
}
}
// SeriesSketches returns the sketches associated with the series data in all
@ -1212,8 +1224,13 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
func (s *Store) SeriesSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
@ -1226,13 +1243,8 @@ func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Ske
//
// Cardinality is calculated using a sketch-based estimation. The result of this
// method cannot be combined with any other results.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
})
func (s *Store) MeasurementsCardinality(ctx context.Context, database string) (int64, error) {
ss, ts, err := s.MeasurementsSketches(ctx, database)
if err != nil {
return 0, err
@ -1245,12 +1257,18 @@ func (s *Store) MeasurementsCardinality(database string) (int64, error) {
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
func (s *Store) MeasurementsSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
// every iteration, check for timeout.
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
}
return sh.MeasurementsSketches()
})
}
@ -1547,7 +1565,7 @@ func (s *Store) WriteToShard(writeCtx WriteContext, shardID uint64, points []mod
// MeasurementNames returns a slice of all measurements. Measurements accepts an
// optional condition expression. If cond is nil, then all measurements for the
// database will be returned.
func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
@ -1567,6 +1585,11 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in
is.Indexes = append(is.Indexes, index)
}
is = is.DedupeInmemIndexes()
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return is.MeasurementNamesByExpr(auth, cond)
}
@ -1589,7 +1612,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
// TagKeys returns the tag keys in the given database, matching the condition.
func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
if len(shardIDs) == 0 {
return nil, nil
}
@ -1662,6 +1685,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
var results []TagKeys
for _, name := range names {
// Check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Build keyset over all indexes for measurement.
tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil)
if err != nil {
@ -1675,6 +1704,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
// If they have authorized series associated with them.
if filterExpr == nil {
for tagKey := range tagKeySet {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey))
if err != nil {
return nil, err
@ -1755,7 +1790,7 @@ func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[
// TagValues returns the tag keys and values for the provided shards, where the
// tag values satisfy the provided condition.
func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
if cond == nil {
return nil, errors.New("a condition is required")
}
@ -1844,6 +1879,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
// values from matching series. Series may be filtered using a WHERE
// filter.
for _, name := range names {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Determine a list of keys from condition.
keySet, err := is.MeasurementTagKeysByExpr(name, cond)
if err != nil {
@ -1906,6 +1948,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
// instances of tagValues for a given measurement.
idxBuf := make([][2]int, 0, len(is.Indexes))
for i < len(allResults) {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Gather all occurrences of the same measurement for merging.
for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) {
j++

View File

@ -288,13 +288,12 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
t.Fatal(err)
}
}
err := s.DeleteMeasurement("db0", "cpu")
if err != nil {
t.Fatal(err)
}
measurements, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil)
measurements, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
if err != nil {
t.Fatal(err)
}
@ -354,7 +353,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
wg.Wait()
keys, err := s.TagKeys(nil, []uint64{1, 2}, nil)
keys, err := s.TagKeys(context.Background(), nil, []uint64{1, 2}, nil)
if err != nil {
t.Fatal(err)
}
@ -454,7 +453,7 @@ func TestStore_DeleteShard(t *testing.T) {
// cpu,serverb=b should be removed from the series file for db0 because
// shard 1 was the only owner of that series.
// Verify by getting all tag keys.
keys, err := s.TagKeys(nil, []uint64{2}, nil)
keys, err := s.TagKeys(context.Background(), nil, []uint64{2}, nil)
if err != nil {
return err
}
@ -469,7 +468,7 @@ func TestStore_DeleteShard(t *testing.T) {
// Verify that the same series was not removed from other databases'
// series files.
if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil {
if keys, err = s.TagKeys(context.Background(), nil, []uint64{3}, nil); err != nil {
return err
}
@ -867,7 +866,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
`cpu value=3 20`,
)
meas, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil)
meas, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
if err != nil {
t.Fatalf("unexpected error with MeasurementNames: %v", err)
}
@ -908,7 +907,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
}
// Delete all the series for each measurement.
mnames, err := store.MeasurementNames(nil, "db", nil)
mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil)
if err != nil {
t.Fatal(err)
}
@ -920,7 +919,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -932,7 +931,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
// Since all the series have been deleted, all the measurements should have
// been removed from the index too.
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
t.Fatal(err)
}
@ -986,7 +985,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -997,7 +996,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) {
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
t.Fatal(err)
}
@ -1068,7 +1067,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -1079,7 +1078,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) {
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
t.Fatal(err)
}
@ -1112,6 +1111,150 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
}
}
func TestStore_MetaQuery_Timeout(t *testing.T) {
if testing.Short() || os.Getenv("APPVEYOR") != "" {
t.Skip("Skipping test in short and appveyor mode.")
}
test := func(index string) {
store := NewStore(index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}
defer store.Close()
testStoreMetaQueryTimeout(t, store, index)
}
for _, index := range tsdb.RegisteredIndexes() {
test(index)
}
}
func testStoreMetaQueryTimeout(t *testing.T, store *Store, index string) {
shards := testStoreMetaQuerySetup(t, store)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "SeriesCardinality"
_, err := store.Store.SeriesCardinality(ctx, "db")
return funcName, err
}, index)(t)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "MeasurementsCardinality"
_, err := store.Store.MeasurementsCardinality(ctx, "db")
return funcName, err
}, index)(t)
keyCondition, allCondition := testStoreMetaQueryCondition()
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "TagValues"
_, err := store.Store.TagValues(ctx, nil, shards, allCondition)
return funcName, err
}, index)(t)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "TagKeys"
_, err := store.Store.TagKeys(ctx, nil, shards, keyCondition)
return funcName, err
}, index)(t)
testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
const funcName = "MeasurementNames"
_, err := store.Store.MeasurementNames(ctx, nil, "db", nil)
return funcName, err
}, index)(t)
}
func testStoreMetaQueryCondition() (influxql.Expr, influxql.Expr) {
keyCondition := &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.OR,
LHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "tagKey4"},
},
RHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "tagKey5"},
},
},
}
whereCondition := &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "tagKey1"},
RHS: &influxql.StringLiteral{Val: "tagValue2"},
},
},
RHS: keyCondition,
},
}
allCondition := &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.ParenExpr{
Expr: &influxql.BinaryExpr{
Op: influxql.EQREGEX,
LHS: &influxql.VarRef{Val: "tagKey3"},
RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`tagValue\d`)},
},
},
RHS: whereCondition,
}
return keyCondition, allCondition
}
func testStoreMetaQuerySetup(t *testing.T, store *Store) []uint64 {
const measurementCnt = 64
const tagCnt = 5
const valueCnt = 5
const pointsPerShard = 20000
// Generate point data to write to the shards.
series := genTestSeries(measurementCnt, tagCnt, valueCnt)
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
// shards such that we never write the same series to multiple shards.
shards := make([]uint64, len(points)/pointsPerShard)
for shardID := 0; shardID < len(points)/pointsPerShard; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
t.Fatalf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points[shardID*pointsPerShard:(shardID+1)*pointsPerShard]); err != nil {
t.Fatalf("batch write: %s", err)
}
shards[shardID] = uint64(shardID)
}
return shards
}
func testStoreMakeTimedFuncs(tested func(context.Context) (string, error), index string) func(*testing.T) {
cancelTested := func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(0))
defer cancel()
funcName, err := tested(ctx)
if err == nil {
t.Fatalf("%v: failed to time out with index type %v", funcName, index)
} else if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
t.Fatalf("%v: failed with %v instead of %v with index type %v", funcName, err, context.DeadlineExceeded, index)
}
}
return cancelTested
}
// Creates a large number of series in multiple shards, which will force
// compactions to occur.
func testStoreCardinalityCompactions(store *Store) error {
@ -1137,7 +1280,7 @@ func testStoreCardinalityCompactions(store *Store) error {
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
return err
}
@ -1148,7 +1291,7 @@ func testStoreCardinalityCompactions(store *Store) error {
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
return err
}
@ -1230,7 +1373,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
}
// Get updated series cardinality from store after writing data.
cardinality, err := store.Store.SeriesCardinality("db")
cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
if err != nil {
t.Fatal(err)
}
@ -1249,7 +1392,7 @@ func TestStore_Sketches(t *testing.T) {
checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error {
// Get sketches and check cardinality...
sketch, tsketch, err := store.SeriesSketches("db")
sketch, tsketch, err := store.SeriesSketches(context.Background(), "db")
if err != nil {
return err
}
@ -1275,7 +1418,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Check measurement cardinality.
if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil {
if sketch, tsketch, err = store.MeasurementsSketches(context.Background(), "db"); err != nil {
return err
}
@ -1329,7 +1472,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Delete half the the measurements data
mnames, err := store.MeasurementNames(nil, "db", nil)
mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil)
if err != nil {
return err
}
@ -1462,9 +1605,8 @@ func TestStore_TagValues(t *testing.T) {
},
}
var s *Store
setup := func(index string) []uint64 { // returns shard ids
s = MustOpenStore(index)
setup := func(index string) (*Store, []uint64) { // returns shard ids
s := MustOpenStore(index)
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
cpu1%[1]d,host=nofoo value=1 %[4]d
@ -1489,14 +1631,14 @@ func TestStore_TagValues(t *testing.T) {
ids = append(ids, uint64(i))
s.MustCreateShardWithData("db0", "rp0", i, genPoints(i)...)
}
return ids
return s, ids
}
for _, example := range examples {
for _, index := range tsdb.RegisteredIndexes() {
shardIDs := setup(index)
s, shardIDs := setup(index)
t.Run(example.Name+"_"+index, func(t *testing.T) {
got, err := s.TagValues(nil, shardIDs, example.Expr)
got, err := s.TagValues(context.Background(), nil, shardIDs, example.Expr)
if err != nil {
t.Fatal(err)
}
@ -1538,7 +1680,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
},
}
names, err := s.MeasurementNames(authorizer, "db0", nil)
names, err := s.MeasurementNames(context.Background(), authorizer, "db0", nil)
if err != nil {
return err
}
@ -1568,7 +1710,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
return err
}
if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil {
if names, err = s.MeasurementNames(context.Background(), authorizer, "db0", nil); err != nil {
return err
}
@ -1625,7 +1767,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
},
}
keys, err := s.TagKeys(authorizer, []uint64{0}, nil)
keys, err := s.TagKeys(context.Background(), authorizer, []uint64{0}, nil)
if err != nil {
return err
}
@ -1660,7 +1802,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
return err
}
if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil {
if keys, err = s.TagKeys(context.Background(), authorizer, []uint64{0}, nil); err != nil {
return err
}
@ -1723,7 +1865,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
},
}
values, err := s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{
values, err := s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "host"},
@ -1763,7 +1905,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
return err
}
values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{
values, err = s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "host"},
@ -1884,7 +2026,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
errC <- nil
return
default:
names, err := s.MeasurementNames(nil, "db0", nil)
names, err := s.MeasurementNames(context.Background(), nil, "db0", nil)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}
@ -1969,7 +2111,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
errC <- nil
return
default:
keys, err := s.TagKeys(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
keys, err := s.TagKeys(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}
@ -2070,7 +2212,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
}
cond := rewrite.(*influxql.ShowTagValuesStatement).Condition
values, err := s.TagValues(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond)
values, err := s.TagValues(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}
@ -2132,7 +2274,7 @@ func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = store.SeriesCardinality("db")
_, _ = store.SeriesCardinality(context.Background(), "db")
}
})
store.Close()
@ -2214,8 +2356,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
{name: "s=10_m=100_v=1000", shards: 10, measurements: 100, tagValues: 1000},
}
var s *Store
setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids
setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
s := NewStore(index)
if err := s.Open(); err != nil {
panic(err)
@ -2251,13 +2392,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
shardIDs = append(shardIDs, uint64(i))
s.MustCreateShardWithData("db0", "rp0", i, genPoints(i, useRandom)...)
}
return shardIDs
}
teardown := func() {
if err := s.Close(); err != nil {
b.Fatal(err)
}
return s, shardIDs
}
// SHOW TAG VALUES WITH KEY IN ("host", "shard")
@ -2296,14 +2431,20 @@ func BenchmarkStore_TagValues(b *testing.B) {
for useRand := 0; useRand < 2; useRand++ {
for c, condition := range []influxql.Expr{cond1, cond2} {
for _, bm := range benchmarks {
shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1)
s, shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1)
teardown := func() {
if err := s.Close(); err != nil {
b.Fatal(err)
}
}
cnd := "Unfiltered"
if c == 0 {
cnd = "Filtered"
}
b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
if tvResult, err = s.TagValues(nil, shardIDs, condition); err != nil {
if tvResult, err = s.TagValues(context.Background(), nil, shardIDs, condition); err != nil {
b.Fatal(err)
}
}