From 092c7a997604a78c21e217a12a23675efba9006e Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Fri, 29 Jan 2021 15:56:29 -0800 Subject: [PATCH] feat: Make meta queries respect QueryTimeout values Meta queries (SHOW TAG VALUES, SHOW TAG KEYS, SHOW SERIES CARDINALITY, etc.) do not respect the QueryTimeout config parameter. Meta queries should check the query context when possible to allow cancellation and timeout. This will not be as frequent as regular queries, which use iterators, because meta queries return data in batches. Add a context.Context to (*Store).MeasurementNames() (*Store).MeasurementsCardinality() (*Store).SeriesCardinality() (*Store).TagValues() (*Store).TagKeys() (*Store).SeriesSketches() (*Store).MeasurementsSketches() which is tested for timeout or cancellation to allow limitation of time spent in meta queries https://github.com/influxdata/influxdb/issues/20736 --- cmd/influxd/run/server.go | 7 +- coordinator/statement_executor.go | 28 ++-- internal/tsdb_store.go | 11 +- services/storage/store.go | 6 +- tests/server_concurrent_test.go | 5 +- tsdb/store.go | 113 +++++++++++---- tsdb/store_test.go | 233 ++++++++++++++++++++++++------ 7 files changed, 299 insertions(+), 104 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 6030670d24..49051a088a 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -1,6 +1,7 @@ package run import ( + "context" "crypto/tls" "fmt" "io" @@ -540,14 +541,16 @@ func (s *Server) reportServer() { for _, db := range dbs { name := db.Name - n, err := s.TSDBStore.SeriesCardinality(name) + // Use the context.Background() to avoid timing out on this. + n, err := s.TSDBStore.SeriesCardinality(context.Background(), name) if err != nil { s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err)) } else { numSeries += n } - n, err = s.TSDBStore.MeasurementsCardinality(name) + // Use the context.Background() to avoid timing out on this. + n, err = s.TSDBStore.MeasurementsCardinality(context.Background(), name) if err != nil { s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err)) } else { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index be32067562..dd5a1b5a35 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -180,11 +180,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query case *influxql.ShowMeasurementsStatement: return e.executeShowMeasurementsStatement(stmt, ctx) case *influxql.ShowMeasurementCardinalityStatement: - rows, err = e.executeShowMeasurementCardinalityStatement(stmt) + rows, err = e.executeShowMeasurementCardinalityStatement(stmt, ctx) case *influxql.ShowRetentionPoliciesStatement: rows, err = e.executeShowRetentionPoliciesStatement(stmt) case *influxql.ShowSeriesCardinalityStatement: - rows, err = e.executeShowSeriesCardinalityStatement(stmt) + rows, err = e.executeShowSeriesCardinalityStatement(stmt, ctx) case *influxql.ShowShardsStatement: rows, err = e.executeShowShardsStatement(stmt) case *influxql.ShowShardGroupsStatement: @@ -719,7 +719,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea return ErrDatabaseNameRequired } - names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition) + names, err := e.TSDBStore.MeasurementNames(ctx.Context, ctx.Authorizer, q.Database, q.Condition) if err != nil || len(names) == 0 { return ctx.Send(&query.Result{ Err: err, @@ -758,12 +758,12 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea }) } -func (e *StatementExecutor) executeShowMeasurementCardinalityStatement(stmt *influxql.ShowMeasurementCardinalityStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowMeasurementCardinalityStatement(stmt *influxql.ShowMeasurementCardinalityStatement, ctx *query.ExecutionContext) (models.Rows, error) { if stmt.Database == "" { return nil, ErrDatabaseNameRequired } - n, err := e.TSDBStore.MeasurementsCardinality(stmt.Database) + n, err := e.TSDBStore.MeasurementsCardinality(ctx.Context, stmt.Database) if err != nil { return nil, err } @@ -829,12 +829,12 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards return rows, nil } -func (e *StatementExecutor) executeShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityStatement, ctx *query.ExecutionContext) (models.Rows, error) { if stmt.Database == "" { return nil, ErrDatabaseNameRequired } - n, err := e.TSDBStore.SeriesCardinality(stmt.Database) + n, err := e.TSDBStore.SeriesCardinality(ctx.Context, stmt.Database) if err != nil { return nil, err } @@ -966,7 +966,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, } } - tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond) + tagKeys, err := e.TSDBStore.TagKeys(ctx.Context, ctx.Authorizer, shardIDs, cond) if err != nil { return ctx.Send(&query.Result{ Err: err, @@ -1053,7 +1053,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem } } - tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond) + tagValues, err := e.TSDBStore.TagValues(ctx.Context, ctx.Authorizer, shardIDs, cond) if err != nil { return ctx.Send(&query.Result{Err: err}) } @@ -1374,12 +1374,12 @@ type TSDBStore interface { DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error - MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) - TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) - SeriesCardinality(database string) (int64, error) - MeasurementsCardinality(database string) (int64, error) + SeriesCardinality(ctx context.Context, database string) (int64, error) + MeasurementsCardinality(ctx context.Context, database string) (int64, error) } var _ TSDBStore = LocalTSDBStore{} diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index de340c3f7f..5ab81201b1 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -1,6 +1,7 @@ package internal import ( + "context" "io" "time" @@ -92,13 +93,13 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error { return s.ImportShardFn(id, r) } -func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { +func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { return s.MeasurementNamesFn(auth, database, cond) } func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) { return s.MeasurementSeriesCountsFn(database) } -func (s *TSDBStoreMock) MeasurementsCardinality(database string) (int64, error) { +func (s *TSDBStoreMock) MeasurementsCardinality(ctx context.Context, database string) (int64, error) { return s.MeasurementsCardinalityFn(database) } func (s *TSDBStoreMock) Open() error { @@ -110,7 +111,7 @@ func (s *TSDBStoreMock) Path() string { func (s *TSDBStoreMock) RestoreShard(id uint64, r io.Reader) error { return s.RestoreShardFn(id, r) } -func (s *TSDBStoreMock) SeriesCardinality(database string) (int64, error) { +func (s *TSDBStoreMock) SeriesCardinality(ctx context.Context, database string) (int64, error) { return s.SeriesCardinalityFn(database) } func (s *TSDBStoreMock) SetShardEnabled(shardID uint64, enabled bool) error { @@ -137,10 +138,10 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard { func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic { return s.StatisticsFn(tags) } -func (s *TSDBStoreMock) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) { +func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) { return s.TagKeysFn(auth, shardIDs, cond) } -func (s *TSDBStoreMock) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) { +func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) { return s.TagValuesFn(auth, shardIDs, cond) } func (s *TSDBStoreMock) WithLogger(log *zap.Logger) { diff --git a/services/storage/store.go b/services/storage/store.go index e2310b85ac..60ef0981dc 100644 --- a/services/storage/store.go +++ b/services/storage/store.go @@ -230,7 +230,7 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - keys, err := s.TSDBStore.TagKeys(auth, shardIDs, expr) + keys, err := s.TSDBStore.TagKeys(ctx, auth, shardIDs, expr) if err != nil { return nil, err } @@ -326,7 +326,7 @@ func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - values, err := s.TSDBStore.TagValues(auth, shardIDs, expr) + values, err := s.TSDBStore.TagValues(ctx, auth, shardIDs, expr) if err != nil { return nil, err } @@ -385,7 +385,7 @@ func (s *Store) MeasurementNames(ctx context.Context, req *MeasurementNamesReque // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - values, err := s.TSDBStore.MeasurementNames(auth, database, expr) + values, err := s.TSDBStore.MeasurementNames(ctx, auth, database, expr) if err != nil { return nil, err } diff --git a/tests/server_concurrent_test.go b/tests/server_concurrent_test.go index ac9e989ee4..8b6178958c 100644 --- a/tests/server_concurrent_test.go +++ b/tests/server_concurrent_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "strings" "sync" @@ -95,7 +96,7 @@ func TestConcurrentServer_TagValues(t *testing.T) { ids = append(ids, si.ID) } } - srv.TSDBStore.TagValues(nil, ids, cond) + srv.TSDBStore.TagValues(context.Background(), nil, ids, cond) } var f3 = func() { s.DropDatabase("db0") } @@ -133,7 +134,7 @@ func TestConcurrentServer_ShowMeasurements(t *testing.T) { if !ok { t.Fatal("Not a local server") } - srv.TSDBStore.MeasurementNames(query.OpenAuthorizer, "db0", nil) + srv.TSDBStore.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) } runTest(10*time.Second, f1, f2) diff --git a/tsdb/store.go b/tsdb/store.go index 73e85fed24..8924548549 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -2,6 +2,7 @@ package tsdb // import "github.com/influxdata/influxdb/tsdb" import ( "bytes" + "context" "errors" "fmt" "io" @@ -147,19 +148,18 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { s.mu.RLock() shards := s.shardsSlice() s.mu.RUnlock() - // Add all the series and measurements cardinality estimations. databases := s.Databases() statistics := make([]models.Statistic, 0, len(databases)) for _, database := range databases { log := s.Logger.With(logger.Database(database)) - sc, err := s.SeriesCardinality(database) + sc, err := s.SeriesCardinality(context.Background(), database) if err != nil { log.Info("Cannot retrieve series cardinality", zap.Error(err)) continue } - mc, err := s.MeasurementsCardinality(database) + mc, err := s.MeasurementsCardinality(context.Background(), database) if err != nil { log.Info("Cannot retrieve measurement cardinality", zap.Error(err)) continue @@ -1180,7 +1180,7 @@ func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (est // Cardinality is calculated exactly by unioning all shards' bitsets of series // IDs. The result of this method cannot be combined with any other results. // -func (s *Store) SeriesCardinality(database string) (int64, error) { +func (s *Store) SeriesCardinality(ctx context.Context, database string) (int64, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() @@ -1188,23 +1188,35 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { var setMu sync.Mutex others := make([]*SeriesIDSet, 0, len(shards)) - s.walkShards(shards, func(sh *Shard) error { - index, err := sh.Index() - if err != nil { - return err + err := s.walkShards(shards, func(sh *Shard) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + index, err := sh.Index() + if err != nil { + return err + } + + seriesIDs := index.SeriesIDSet() + setMu.Lock() + others = append(others, seriesIDs) + setMu.Unlock() + + return nil } - - seriesIDs := index.SeriesIDSet() - setMu.Lock() - others = append(others, seriesIDs) - setMu.Unlock() - - return nil }) - + if err != nil { + return 0, err + } ss := NewSeriesIDSet() ss.Merge(others...) - return int64(ss.Cardinality()), nil + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + return int64(ss.Cardinality()), nil + } } // SeriesSketches returns the sketches associated with the series data in all @@ -1212,8 +1224,13 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { // // The returned sketches can be combined with other sketches to provide an // estimation across distributed databases. -func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) { +func (s *Store) SeriesSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) { return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } if sh == nil { return nil, nil, errors.New("shard nil, can't get cardinality") } @@ -1226,13 +1243,8 @@ func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Ske // // Cardinality is calculated using a sketch-based estimation. The result of this // method cannot be combined with any other results. -func (s *Store) MeasurementsCardinality(database string) (int64, error) { - ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { - if sh == nil { - return nil, nil, errors.New("shard nil, can't get cardinality") - } - return sh.MeasurementsSketches() - }) +func (s *Store) MeasurementsCardinality(ctx context.Context, database string) (int64, error) { + ss, ts, err := s.MeasurementsSketches(ctx, database) if err != nil { return 0, err @@ -1245,12 +1257,18 @@ func (s *Store) MeasurementsCardinality(database string) (int64, error) { // // The returned sketches can be combined with other sketches to provide an // estimation across distributed databases. -func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) { +func (s *Store) MeasurementsSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) { return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { - if sh == nil { - return nil, nil, errors.New("shard nil, can't get cardinality") + // every iteration, check for timeout. + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + if sh == nil { + return nil, nil, errors.New("shard nil, can't get cardinality") + } + return sh.MeasurementsSketches() } - return sh.MeasurementsSketches() }) } @@ -1547,7 +1565,7 @@ func (s *Store) WriteToShard(writeCtx WriteContext, shardID uint64, points []mod // MeasurementNames returns a slice of all measurements. Measurements accepts an // optional condition expression. If cond is nil, then all measurements for the // database will be returned. -func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { +func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() @@ -1567,6 +1585,11 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in is.Indexes = append(is.Indexes, index) } is = is.DedupeInmemIndexes() + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } return is.MeasurementNamesByExpr(auth, cond) } @@ -1589,7 +1612,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement } // TagKeys returns the tag keys in the given database, matching the condition. -func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) { +func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) { if len(shardIDs) == 0 { return nil, nil } @@ -1662,6 +1685,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql. var results []TagKeys for _, name := range names { + // Check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } // Build keyset over all indexes for measurement. tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil) if err != nil { @@ -1675,6 +1704,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql. // If they have authorized series associated with them. if filterExpr == nil { for tagKey := range tagKeySet { + // check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey)) if err != nil { return nil, err @@ -1755,7 +1790,7 @@ func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[ // TagValues returns the tag keys and values for the provided shards, where the // tag values satisfy the provided condition. -func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) { +func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) { if cond == nil { return nil, errors.New("a condition is required") } @@ -1844,6 +1879,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq // values from matching series. Series may be filtered using a WHERE // filter. for _, name := range names { + // check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // Determine a list of keys from condition. keySet, err := is.MeasurementTagKeysByExpr(name, cond) if err != nil { @@ -1906,6 +1948,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq // instances of tagValues for a given measurement. idxBuf := make([][2]int, 0, len(is.Indexes)) for i < len(allResults) { + // check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // Gather all occurrences of the same measurement for merging. for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) { j++ diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 30dd8d2c49..64487d360d 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -288,13 +288,12 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) { t.Fatal(err) } } - err := s.DeleteMeasurement("db0", "cpu") if err != nil { t.Fatal(err) } - measurements, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil) + measurements, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) if err != nil { t.Fatal(err) } @@ -354,7 +353,7 @@ func TestStore_WriteMixedShards(t *testing.T) { wg.Wait() - keys, err := s.TagKeys(nil, []uint64{1, 2}, nil) + keys, err := s.TagKeys(context.Background(), nil, []uint64{1, 2}, nil) if err != nil { t.Fatal(err) } @@ -454,7 +453,7 @@ func TestStore_DeleteShard(t *testing.T) { // cpu,serverb=b should be removed from the series file for db0 because // shard 1 was the only owner of that series. // Verify by getting all tag keys. - keys, err := s.TagKeys(nil, []uint64{2}, nil) + keys, err := s.TagKeys(context.Background(), nil, []uint64{2}, nil) if err != nil { return err } @@ -469,7 +468,7 @@ func TestStore_DeleteShard(t *testing.T) { // Verify that the same series was not removed from other databases' // series files. - if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil { + if keys, err = s.TagKeys(context.Background(), nil, []uint64{3}, nil); err != nil { return err } @@ -867,7 +866,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) { `cpu value=3 20`, ) - meas, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil) + meas, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) if err != nil { t.Fatalf("unexpected error with MeasurementNames: %v", err) } @@ -908,7 +907,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { } // Delete all the series for each measurement. - mnames, err := store.MeasurementNames(nil, "db", nil) + mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil) if err != nil { t.Fatal(err) } @@ -920,7 +919,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -932,7 +931,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { // Since all the series have been deleted, all the measurements should have // been removed from the index too. - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { t.Fatal(err) } @@ -986,7 +985,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -997,7 +996,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) { } // Estimate the measurement cardinality... - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { t.Fatal(err) } @@ -1068,7 +1067,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -1079,7 +1078,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) { } // Estimate the measurement cardinality... - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { t.Fatal(err) } @@ -1112,6 +1111,150 @@ func TestStore_Cardinality_Duplicates(t *testing.T) { } } +func TestStore_MetaQuery_Timeout(t *testing.T) { + if testing.Short() || os.Getenv("APPVEYOR") != "" { + t.Skip("Skipping test in short and appveyor mode.") + } + + test := func(index string) { + store := NewStore(index) + store.EngineOptions.Config.MaxSeriesPerDatabase = 0 + if err := store.Open(); err != nil { + panic(err) + } + defer store.Close() + testStoreMetaQueryTimeout(t, store, index) + } + + for _, index := range tsdb.RegisteredIndexes() { + test(index) + } +} + +func testStoreMetaQueryTimeout(t *testing.T, store *Store, index string) { + shards := testStoreMetaQuerySetup(t, store) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "SeriesCardinality" + _, err := store.Store.SeriesCardinality(ctx, "db") + return funcName, err + }, index)(t) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "MeasurementsCardinality" + _, err := store.Store.MeasurementsCardinality(ctx, "db") + return funcName, err + }, index)(t) + + keyCondition, allCondition := testStoreMetaQueryCondition() + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "TagValues" + _, err := store.Store.TagValues(ctx, nil, shards, allCondition) + return funcName, err + }, index)(t) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "TagKeys" + _, err := store.Store.TagKeys(ctx, nil, shards, keyCondition) + return funcName, err + }, index)(t) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "MeasurementNames" + _, err := store.Store.MeasurementNames(ctx, nil, "db", nil) + return funcName, err + }, index)(t) +} + +func testStoreMetaQueryCondition() (influxql.Expr, influxql.Expr) { + keyCondition := &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.OR, + LHS: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "_tagKey"}, + RHS: &influxql.StringLiteral{Val: "tagKey4"}, + }, + RHS: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "_tagKey"}, + RHS: &influxql.StringLiteral{Val: "tagKey5"}, + }, + }, + } + + whereCondition := &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "tagKey1"}, + RHS: &influxql.StringLiteral{Val: "tagValue2"}, + }, + }, + RHS: keyCondition, + }, + } + + allCondition := &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.EQREGEX, + LHS: &influxql.VarRef{Val: "tagKey3"}, + RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`tagValue\d`)}, + }, + }, + RHS: whereCondition, + } + return keyCondition, allCondition +} + +func testStoreMetaQuerySetup(t *testing.T, store *Store) []uint64 { + const measurementCnt = 64 + const tagCnt = 5 + const valueCnt = 5 + const pointsPerShard = 20000 + + // Generate point data to write to the shards. + series := genTestSeries(measurementCnt, tagCnt, valueCnt) + + points := make([]models.Point, 0, len(series)) + for _, s := range series { + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) + } + // Create requested number of shards in the store & write points across + // shards such that we never write the same series to multiple shards. + shards := make([]uint64, len(points)/pointsPerShard) + for shardID := 0; shardID < len(points)/pointsPerShard; shardID++ { + if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { + t.Fatalf("create shard: %s", err) + } + if err := store.BatchWrite(shardID, points[shardID*pointsPerShard:(shardID+1)*pointsPerShard]); err != nil { + t.Fatalf("batch write: %s", err) + } + shards[shardID] = uint64(shardID) + } + return shards +} + +func testStoreMakeTimedFuncs(tested func(context.Context) (string, error), index string) func(*testing.T) { + cancelTested := func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(0)) + defer cancel() + + funcName, err := tested(ctx) + if err == nil { + t.Fatalf("%v: failed to time out with index type %v", funcName, index) + } else if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + t.Fatalf("%v: failed with %v instead of %v with index type %v", funcName, err, context.DeadlineExceeded, index) + } + } + return cancelTested +} + // Creates a large number of series in multiple shards, which will force // compactions to occur. func testStoreCardinalityCompactions(store *Store) error { @@ -1137,7 +1280,7 @@ func testStoreCardinalityCompactions(store *Store) error { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { return err } @@ -1148,7 +1291,7 @@ func testStoreCardinalityCompactions(store *Store) error { } // Estimate the measurement cardinality... - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { return err } @@ -1230,7 +1373,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) { } // Get updated series cardinality from store after writing data. - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -1249,7 +1392,7 @@ func TestStore_Sketches(t *testing.T) { checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error { // Get sketches and check cardinality... - sketch, tsketch, err := store.SeriesSketches("db") + sketch, tsketch, err := store.SeriesSketches(context.Background(), "db") if err != nil { return err } @@ -1275,7 +1418,7 @@ func TestStore_Sketches(t *testing.T) { } // Check measurement cardinality. - if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil { + if sketch, tsketch, err = store.MeasurementsSketches(context.Background(), "db"); err != nil { return err } @@ -1329,7 +1472,7 @@ func TestStore_Sketches(t *testing.T) { } // Delete half the the measurements data - mnames, err := store.MeasurementNames(nil, "db", nil) + mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil) if err != nil { return err } @@ -1462,9 +1605,8 @@ func TestStore_TagValues(t *testing.T) { }, } - var s *Store - setup := func(index string) []uint64 { // returns shard ids - s = MustOpenStore(index) + setup := func(index string) (*Store, []uint64) { // returns shard ids + s := MustOpenStore(index) fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d cpu1%[1]d,host=nofoo value=1 %[4]d @@ -1489,14 +1631,14 @@ func TestStore_TagValues(t *testing.T) { ids = append(ids, uint64(i)) s.MustCreateShardWithData("db0", "rp0", i, genPoints(i)...) } - return ids + return s, ids } for _, example := range examples { for _, index := range tsdb.RegisteredIndexes() { - shardIDs := setup(index) + s, shardIDs := setup(index) t.Run(example.Name+"_"+index, func(t *testing.T) { - got, err := s.TagValues(nil, shardIDs, example.Expr) + got, err := s.TagValues(context.Background(), nil, shardIDs, example.Expr) if err != nil { t.Fatal(err) } @@ -1538,7 +1680,7 @@ func TestStore_Measurements_Auth(t *testing.T) { }, } - names, err := s.MeasurementNames(authorizer, "db0", nil) + names, err := s.MeasurementNames(context.Background(), authorizer, "db0", nil) if err != nil { return err } @@ -1568,7 +1710,7 @@ func TestStore_Measurements_Auth(t *testing.T) { return err } - if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil { + if names, err = s.MeasurementNames(context.Background(), authorizer, "db0", nil); err != nil { return err } @@ -1625,7 +1767,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { }, } - keys, err := s.TagKeys(authorizer, []uint64{0}, nil) + keys, err := s.TagKeys(context.Background(), authorizer, []uint64{0}, nil) if err != nil { return err } @@ -1660,7 +1802,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { return err } - if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil { + if keys, err = s.TagKeys(context.Background(), authorizer, []uint64{0}, nil); err != nil { return err } @@ -1723,7 +1865,7 @@ func TestStore_TagValues_Auth(t *testing.T) { }, } - values, err := s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{ + values, err := s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "_tagKey"}, RHS: &influxql.StringLiteral{Val: "host"}, @@ -1763,7 +1905,7 @@ func TestStore_TagValues_Auth(t *testing.T) { return err } - values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{ + values, err = s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "_tagKey"}, RHS: &influxql.StringLiteral{Val: "host"}, @@ -1884,7 +2026,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { errC <- nil return default: - names, err := s.MeasurementNames(nil, "db0", nil) + names, err := s.MeasurementNames(context.Background(), nil, "db0", nil) if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { continue // These errors are expected } @@ -1969,7 +2111,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { errC <- nil return default: - keys, err := s.TagKeys(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + keys, err := s.TagKeys(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { continue // These errors are expected } @@ -2070,7 +2212,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { } cond := rewrite.(*influxql.ShowTagValuesStatement).Condition - values, err := s.TagValues(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond) + values, err := s.TagValues(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond) if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { continue // These errors are expected } @@ -2132,7 +2274,7 @@ func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _ = store.SeriesCardinality("db") + _, _ = store.SeriesCardinality(context.Background(), "db") } }) store.Close() @@ -2214,8 +2356,7 @@ func BenchmarkStore_TagValues(b *testing.B) { {name: "s=10_m=100_v=1000", shards: 10, measurements: 100, tagValues: 1000}, } - var s *Store - setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids + setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids s := NewStore(index) if err := s.Open(); err != nil { panic(err) @@ -2251,13 +2392,7 @@ func BenchmarkStore_TagValues(b *testing.B) { shardIDs = append(shardIDs, uint64(i)) s.MustCreateShardWithData("db0", "rp0", i, genPoints(i, useRandom)...) } - return shardIDs - } - - teardown := func() { - if err := s.Close(); err != nil { - b.Fatal(err) - } + return s, shardIDs } // SHOW TAG VALUES WITH KEY IN ("host", "shard") @@ -2296,14 +2431,20 @@ func BenchmarkStore_TagValues(b *testing.B) { for useRand := 0; useRand < 2; useRand++ { for c, condition := range []influxql.Expr{cond1, cond2} { for _, bm := range benchmarks { - shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1) + s, shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1) + teardown := func() { + if err := s.Close(); err != nil { + b.Fatal(err) + } + } + cnd := "Unfiltered" if c == 0 { cnd = "Filtered" } b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - if tvResult, err = s.TagValues(nil, shardIDs, condition); err != nil { + if tvResult, err = s.TagValues(context.Background(), nil, shardIDs, condition); err != nil { b.Fatal(err) } }