diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index f9f9abd447..03ce6bb84c 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -31,7 +31,7 @@ type Engine interface { influxdb.BackupService influxdb.RestoreService - SeriesCardinality(orgID, bucketID platform.ID) int64 + SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 TSDBStore() storage.TSDBStore MetaClient() storage.MetaClient @@ -116,8 +116,8 @@ func (t *TemporaryEngine) WritePoints(ctx context.Context, orgID platform.ID, bu } // SeriesCardinality returns the number of series in the engine. -func (t *TemporaryEngine) SeriesCardinality(orgID, bucketID platform.ID) int64 { - return t.engine.SeriesCardinality(orgID, bucketID) +func (t *TemporaryEngine) SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 { + return t.engine.SeriesCardinality(ctx, bucketID) } // DeleteBucketRangePredicate will delete a bucket from the range and predicate. diff --git a/cmd/influxd/launcher/storage_test.go b/cmd/influxd/launcher/storage_test.go index d08bc2d632..8c481ea4f5 100644 --- a/cmd/influxd/launcher/storage_test.go +++ b/cmd/influxd/launcher/storage_test.go @@ -128,7 +128,7 @@ func TestLauncher_BucketDelete(t *testing.T) { // Verify the cardinality in the engine. engine := l.Launcher.Engine() - if got, exp := engine.SeriesCardinality(l.Org.ID, l.Bucket.ID), int64(1); got != exp { + if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(1); got != exp { t.Fatalf("got %d, exp %d", got, exp) } @@ -150,7 +150,7 @@ func TestLauncher_BucketDelete(t *testing.T) { } // Verify that the data has been removed from the storage engine. - if got, exp := engine.SeriesCardinality(l.Org.ID, l.Bucket.ID), int64(0); got != exp { + if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(0); got != exp { t.Fatalf("after bucket delete got %d, exp %d", got, exp) } } diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index cdec05f466..3780cfb350 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -1,6 +1,7 @@ package internal import ( + "context" "io" "time" @@ -30,7 +31,7 @@ type TSDBStoreMock struct { ImportShardFn func(id uint64, r io.Reader) error MeasurementSeriesCountsFn func(database string) (measurements int, series int) MeasurementsCardinalityFn func(database string) (int64, error) - MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) OpenFn func() error PathFn func() string RestoreShardFn func(id uint64, r io.Reader) error @@ -43,8 +44,8 @@ type TSDBStoreMock struct { ShardRelativePathFn func(id uint64) (string, error) ShardsFn func(ids []uint64) []*tsdb.Shard StatisticsFn func(tags map[string]string) []models.Statistic - TagKeysFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValuesFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) WithLoggerFn func(log *zap.Logger) WriteToShardFn func(shardID uint64, points []models.Point) error } @@ -92,8 +93,8 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error { return s.ImportShardFn(id, r) } -func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { - return s.MeasurementNamesFn(auth, database, cond) +func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { + return s.MeasurementNamesFn(ctx, auth, database, cond) } func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measurements int, series int) { return s.MeasurementSeriesCountsFn(database) @@ -137,11 +138,11 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard { func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic { return s.StatisticsFn(tags) } -func (s *TSDBStoreMock) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) { - return s.TagKeysFn(auth, shardIDs, cond) +func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) { + return s.TagKeysFn(ctx, auth, shardIDs, cond) } -func (s *TSDBStoreMock) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) { - return s.TagValuesFn(auth, shardIDs, cond) +func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) { + return s.TagValuesFn(ctx, auth, shardIDs, cond) } func (s *TSDBStoreMock) WithLogger(log *zap.Logger) { s.WithLoggerFn(log) diff --git a/storage/engine.go b/storage/engine.go index 7b9e1bff7e..f08fc5a50c 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -90,11 +90,11 @@ type MetaClient interface { type TSDBStore interface { DeleteMeasurement(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error - MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) ShardGroup(ids []uint64) tsdb.ShardGroup Shards(ids []uint64) []*tsdb.Shard - TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) } // NewEngine initialises a new storage engine, including a series file, index and @@ -463,14 +463,14 @@ func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) } // SeriesCardinality returns the number of series in the engine. -func (e *Engine) SeriesCardinality(orgID, bucketID platform.ID) int64 { +func (e *Engine) SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 { e.mu.RLock() defer e.mu.RUnlock() if e.closing == nil { return 0 } - n, err := e.tsdbStore.SeriesCardinality(bucketID.String()) + n, err := e.tsdbStore.SeriesCardinality(ctx, bucketID.String()) if err != nil { return 0 } diff --git a/tsdb/store.go b/tsdb/store.go index 149639ba36..dbc8bb3075 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -139,13 +139,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { statistics := make([]models.Statistic, 0, len(databases)) for _, database := range databases { log := s.Logger.With(logger.Database(database)) - sc, err := s.SeriesCardinality(database) + sc, err := s.SeriesCardinality(context.Background(), database) if err != nil { log.Info("Cannot retrieve series cardinality", zap.Error(err)) continue } - mc, err := s.MeasurementsCardinality(database) + mc, err := s.MeasurementsCardinality(context.Background(), database) if err != nil { log.Info("Cannot retrieve measurement cardinality", zap.Error(err)) continue @@ -1049,7 +1049,7 @@ func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (est // Cardinality is calculated exactly by unioning all shards' bitsets of series // IDs. The result of this method cannot be combined with any other results. // -func (s *Store) SeriesCardinality(database string) (int64, error) { +func (s *Store) SeriesCardinality(ctx context.Context, database string) (int64, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() @@ -1057,7 +1057,12 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { var setMu sync.Mutex others := make([]*SeriesIDSet, 0, len(shards)) - s.walkShards(shards, func(sh *Shard) error { + err := s.walkShards(shards, func(sh *Shard) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } index, err := sh.Index() if err != nil { return err @@ -1070,9 +1075,17 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { return nil }) + if err != nil { + return 0, err + } ss := NewSeriesIDSet() ss.Merge(others...) + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } return int64(ss.Cardinality()), nil } @@ -1081,8 +1094,13 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { // // The returned sketches can be combined with other sketches to provide an // estimation across distributed databases. -func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) { +func (s *Store) SeriesSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) { return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } if sh == nil { return nil, nil, errors.New("shard nil, can't get cardinality") } @@ -1095,13 +1113,8 @@ func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Ske // // Cardinality is calculated using a sketch-based estimation. The result of this // method cannot be combined with any other results. -func (s *Store) MeasurementsCardinality(database string) (int64, error) { - ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { - if sh == nil { - return nil, nil, errors.New("shard nil, can't get cardinality") - } - return sh.MeasurementsSketches() - }) +func (s *Store) MeasurementsCardinality(ctx context.Context, database string) (int64, error) { + ss, ts, err := s.MeasurementsSketches(ctx, database) if err != nil { return 0, err @@ -1114,8 +1127,14 @@ func (s *Store) MeasurementsCardinality(database string) (int64, error) { // // The returned sketches can be combined with other sketches to provide an // estimation across distributed databases. -func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) { +func (s *Store) MeasurementsSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) { return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + // every iteration, check for timeout. + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } if sh == nil { return nil, nil, errors.New("shard nil, can't get cardinality") } @@ -1430,7 +1449,7 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { // MeasurementNames returns a slice of all measurements. Measurements accepts an // optional condition expression. If cond is nil, then all measurements for the // database will be returned. -func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { +func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() @@ -1449,6 +1468,11 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in } is.Indexes = append(is.Indexes, index) } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } return is.MeasurementNamesByExpr(auth, cond) } @@ -1471,7 +1495,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement } // TagKeys returns the tag keys in the given database, matching the condition. -func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) { +func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) { if len(shardIDs) == 0 { return nil, nil } @@ -1543,6 +1567,13 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql. var results []TagKeys for _, name := range names { + // Check for timeouts. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // Build keyset over all indexes for measurement. tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil) if err != nil { @@ -1556,6 +1587,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql. // If they have authorized series associated with them. if filterExpr == nil { for tagKey := range tagKeySet { + // check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey)) if err != nil { return nil, err @@ -1636,7 +1673,7 @@ func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[ // TagValues returns the tag keys and values for the provided shards, where the // tag values satisfy the provided condition. -func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) { +func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) { if cond == nil { return nil, errors.New("a condition is required") } @@ -1724,6 +1761,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq // values from matching series. Series may be filtered using a WHERE // filter. for _, name := range names { + // check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // Determine a list of keys from condition. keySet, err := is.MeasurementTagKeysByExpr(name, cond) if err != nil { @@ -1786,6 +1830,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq // instances of tagValues for a given measurement. idxBuf := make([][2]int, 0, len(is.Indexes)) for i < len(allResults) { + // check for timeouts + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // Gather all occurrences of the same measurement for merging. for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) { j++ diff --git a/tsdb/store_test.go b/tsdb/store_test.go index afd5b5b7d2..4d5fe95d8f 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/influxdb/v2/pkg/slices" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxql" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -243,7 +244,7 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) { t.Fatal(err) } - measurements, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil) + measurements, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) if err != nil { t.Fatal(err) } @@ -302,7 +303,7 @@ func TestStore_WriteMixedShards(t *testing.T) { wg.Wait() - keys, err := s.TagKeys(nil, []uint64{1, 2}, nil) + keys, err := s.TagKeys(context.Background(), nil, []uint64{1, 2}, nil) if err != nil { t.Fatal(err) } @@ -400,7 +401,7 @@ func TestStore_DeleteShard(t *testing.T) { // cpu,serverb=b should be removed from the series file for db0 because // shard 1 was the only owner of that series. // Verify by getting all tag keys. - keys, err := s.TagKeys(nil, []uint64{2}, nil) + keys, err := s.TagKeys(context.Background(), nil, []uint64{2}, nil) if err != nil { return err } @@ -415,7 +416,7 @@ func TestStore_DeleteShard(t *testing.T) { // Verify that the same series was not removed from other databases' // series files. - if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil { + if keys, err = s.TagKeys(context.Background(), nil, []uint64{3}, nil); err != nil { return err } @@ -801,7 +802,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) { `cpu value=3 20`, ) - meas, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil) + meas, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) if err != nil { t.Fatalf("unexpected error with MeasurementNames: %v", err) } @@ -842,7 +843,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { } // Delete all the series for each measurement. - mnames, err := store.MeasurementNames(nil, "db", nil) + mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil) if err != nil { t.Fatal(err) } @@ -854,7 +855,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -866,7 +867,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { // Since all the series have been deleted, all the measurements should have // been removed from the index too. - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { t.Fatal(err) } @@ -919,7 +920,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -930,7 +931,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) { } // Estimate the measurement cardinality... - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { t.Fatal(err) } @@ -999,7 +1000,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { t.Fatal(err) } @@ -1010,7 +1011,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) { } // Estimate the measurement cardinality... - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { t.Fatal(err) } @@ -1041,6 +1042,149 @@ func TestStore_Cardinality_Duplicates(t *testing.T) { } } +func TestStore_MetaQuery_Timeout(t *testing.T) { + if testing.Short() || os.Getenv("APPVEYOR") != "" { + t.Skip("Skipping test in short and appveyor mode.") + } + + test := func(t *testing.T, index string) { + store := NewStore(t, index) + require.NoError(t, store.Open()) + defer store.Close() + testStoreMetaQueryTimeout(t, store, index) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + test(t, index) + }) + } +} + +func testStoreMetaQueryTimeout(t *testing.T, store *Store, index string) { + shards := testStoreMetaQuerySetup(t, store) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "SeriesCardinality" + _, err := store.Store.SeriesCardinality(ctx, "db") + return funcName, err + }, index)(t) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "MeasurementsCardinality" + _, err := store.Store.MeasurementsCardinality(ctx, "db") + return funcName, err + }, index)(t) + + keyCondition, allCondition := testStoreMetaQueryCondition() + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "TagValues" + _, err := store.Store.TagValues(ctx, nil, shards, allCondition) + return funcName, err + }, index)(t) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "TagKeys" + _, err := store.Store.TagKeys(ctx, nil, shards, keyCondition) + return funcName, err + }, index)(t) + + testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { + const funcName = "MeasurementNames" + _, err := store.Store.MeasurementNames(ctx, nil, "db", nil) + return funcName, err + }, index)(t) +} + +func testStoreMetaQueryCondition() (influxql.Expr, influxql.Expr) { + keyCondition := &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.OR, + LHS: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "_tagKey"}, + RHS: &influxql.StringLiteral{Val: "tagKey4"}, + }, + RHS: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "_tagKey"}, + RHS: &influxql.StringLiteral{Val: "tagKey5"}, + }, + }, + } + + whereCondition := &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "tagKey1"}, + RHS: &influxql.StringLiteral{Val: "tagValue2"}, + }, + }, + RHS: keyCondition, + }, + } + + allCondition := &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: &influxql.ParenExpr{ + Expr: &influxql.BinaryExpr{ + Op: influxql.EQREGEX, + LHS: &influxql.VarRef{Val: "tagKey3"}, + RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`tagValue\d`)}, + }, + }, + RHS: whereCondition, + } + return keyCondition, allCondition +} + +func testStoreMetaQuerySetup(t *testing.T, store *Store) []uint64 { + const measurementCnt = 64 + const tagCnt = 5 + const valueCnt = 5 + const pointsPerShard = 20000 + + // Generate point data to write to the shards. + series := genTestSeries(measurementCnt, tagCnt, valueCnt) + + points := make([]models.Point, 0, len(series)) + for _, s := range series { + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) + } + // Create requested number of shards in the store & write points across + // shards such that we never write the same series to multiple shards. + shards := make([]uint64, len(points)/pointsPerShard) + for shardID := 0; shardID < len(points)/pointsPerShard; shardID++ { + if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { + t.Fatalf("create shard: %s", err) + } + if err := store.BatchWrite(shardID, points[shardID*pointsPerShard:(shardID+1)*pointsPerShard]); err != nil { + t.Fatalf("batch write: %s", err) + } + shards[shardID] = uint64(shardID) + } + return shards +} + +func testStoreMakeTimedFuncs(tested func(context.Context) (string, error), index string) func(*testing.T) { + cancelTested := func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(0)) + defer cancel() + + funcName, err := tested(ctx) + if err == nil { + t.Fatalf("%v: failed to time out with index type %v", funcName, index) + } else if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + t.Fatalf("%v: failed with %v instead of %v with index type %v", funcName, err, context.DeadlineExceeded, index) + } + } + return cancelTested +} + // Creates a large number of series in multiple shards, which will force // compactions to occur. func testStoreCardinalityCompactions(store *Store) error { @@ -1066,7 +1210,7 @@ func testStoreCardinalityCompactions(store *Store) error { } // Estimate the series cardinality... - cardinality, err := store.Store.SeriesCardinality("db") + cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") if err != nil { return err } @@ -1077,7 +1221,7 @@ func testStoreCardinalityCompactions(store *Store) error { } // Estimate the measurement cardinality... - if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { + if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { return err } @@ -1116,7 +1260,7 @@ func TestStore_Sketches(t *testing.T) { checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error { // Get sketches and check cardinality... - sketch, tsketch, err := store.SeriesSketches("db") + sketch, tsketch, err := store.SeriesSketches(context.Background(), "db") if err != nil { return err } @@ -1142,7 +1286,7 @@ func TestStore_Sketches(t *testing.T) { } // Check measurement cardinality. - if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil { + if sketch, tsketch, err = store.MeasurementsSketches(context.Background(), "db"); err != nil { return err } @@ -1196,7 +1340,7 @@ func TestStore_Sketches(t *testing.T) { } // Delete half the the measurements data - mnames, err := store.MeasurementNames(nil, "db", nil) + mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil) if err != nil { return err } @@ -1322,9 +1466,8 @@ func TestStore_TagValues(t *testing.T) { }, } - var s *Store - setup := func(t *testing.T, index string) []uint64 { // returns shard ids - s = MustOpenStore(t, index) + setup := func(t *testing.T, index string) (*Store, []uint64) { // returns shard ids + s := MustOpenStore(t, index) fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d cpu1%[1]d,host=nofoo value=1 %[4]d @@ -1349,14 +1492,15 @@ func TestStore_TagValues(t *testing.T) { ids = append(ids, uint64(i)) s.MustCreateShardWithData("db0", "rp0", i, genPoints(i)...) } - return ids + return s, ids } for _, example := range examples { for _, index := range tsdb.RegisteredIndexes() { t.Run(example.Name+"_"+index, func(t *testing.T) { - shardIDs := setup(t, index) - got, err := s.TagValues(nil, shardIDs, example.Expr) + s, shardIDs := setup(t, index) + defer s.Close() + got, err := s.TagValues(context.Background(), nil, shardIDs, example.Expr) if err != nil { t.Fatal(err) } @@ -1366,7 +1510,6 @@ func TestStore_TagValues(t *testing.T) { t.Fatalf("got:\n%#v\n\nexp:\n%#v", got, exp) } }) - s.Close() } } } @@ -1397,7 +1540,7 @@ func TestStore_Measurements_Auth(t *testing.T) { }, } - names, err := s.MeasurementNames(authorizer, "db0", nil) + names, err := s.MeasurementNames(context.Background(), authorizer, "db0", nil) if err != nil { return err } @@ -1427,7 +1570,7 @@ func TestStore_Measurements_Auth(t *testing.T) { return err } - if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil { + if names, err = s.MeasurementNames(context.Background(), authorizer, "db0", nil); err != nil { return err } @@ -1483,7 +1626,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { }, } - keys, err := s.TagKeys(authorizer, []uint64{0}, nil) + keys, err := s.TagKeys(context.Background(), authorizer, []uint64{0}, nil) if err != nil { return err } @@ -1518,7 +1661,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { return err } - if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil { + if keys, err = s.TagKeys(context.Background(), authorizer, []uint64{0}, nil); err != nil { return err } @@ -1580,7 +1723,7 @@ func TestStore_TagValues_Auth(t *testing.T) { }, } - values, err := s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{ + values, err := s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "_tagKey"}, RHS: &influxql.StringLiteral{Val: "host"}, @@ -1620,7 +1763,7 @@ func TestStore_TagValues_Auth(t *testing.T) { return err } - values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{ + values, err = s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "_tagKey"}, RHS: &influxql.StringLiteral{Val: "host"}, @@ -1741,7 +1884,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { errC <- nil return default: - names, err := s.MeasurementNames(nil, "db0", nil) + names, err := s.MeasurementNames(context.Background(), nil, "db0", nil) if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { continue // These errors are expected } @@ -1826,7 +1969,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { errC <- nil return default: - keys, err := s.TagKeys(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + keys, err := s.TagKeys(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { continue // These errors are expected } @@ -1927,7 +2070,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { } cond := rewrite.(*influxql.ShowTagValuesStatement).Condition - values, err := s.TagValues(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond) + values, err := s.TagValues(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond) if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { continue // These errors are expected } @@ -1989,7 +2132,7 @@ func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _ = store.SeriesCardinality("db") + _, _ = store.SeriesCardinality(context.Background(), "db") } }) store.Close() @@ -2071,8 +2214,7 @@ func BenchmarkStore_TagValues(b *testing.B) { {name: "s=10_m=100_v=1000", shards: 10, measurements: 100, tagValues: 1000}, } - var s *Store - setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids + setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids s := NewStore(b, index) if err := s.Open(); err != nil { panic(err) @@ -2108,13 +2250,7 @@ func BenchmarkStore_TagValues(b *testing.B) { shardIDs = append(shardIDs, uint64(i)) s.MustCreateShardWithData("db0", "rp0", i, genPoints(i, useRandom)...) } - return shardIDs - } - - teardown := func() { - if err := s.Close(); err != nil { - b.Fatal(err) - } + return s, shardIDs } // SHOW TAG VALUES WITH KEY IN ("host", "shard") @@ -2153,14 +2289,19 @@ func BenchmarkStore_TagValues(b *testing.B) { for useRand := 0; useRand < 2; useRand++ { for c, condition := range []influxql.Expr{cond1, cond2} { for _, bm := range benchmarks { - shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1) + s, shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1) + teardown := func() { + if err := s.Close(); err != nil { + b.Fatal(err) + } + } cnd := "Unfiltered" if c == 0 { cnd = "Filtered" } b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - if tvResult, err = s.TagValues(nil, shardIDs, condition); err != nil { + if tvResult, err = s.TagValues(context.Background(), nil, shardIDs, condition); err != nil { b.Fatal(err) } } diff --git a/v1/coordinator/statement_executor.go b/v1/coordinator/statement_executor.go index aa2a6dc489..f7db67e3be 100644 --- a/v1/coordinator/statement_executor.go +++ b/v1/coordinator/statement_executor.go @@ -396,7 +396,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(ctx context.Context return err } - names, err := e.TSDBStore.MeasurementNames(ectx.Authorizer, mapping.BucketID.String(), q.Condition) + names, err := e.TSDBStore.MeasurementNames(ctx, ectx.Authorizer, mapping.BucketID.String(), q.Condition) if err != nil || len(names) == 0 { return ectx.Send(ctx, &query.Result{ Err: err, @@ -510,7 +510,7 @@ func (e *StatementExecutor) executeShowTagKeys(ctx context.Context, q *influxql. } } - tagKeys, err := e.TSDBStore.TagKeys(ectx.Authorizer, shardIDs, cond) + tagKeys, err := e.TSDBStore.TagKeys(ctx, ectx.Authorizer, shardIDs, cond) if err != nil { return ectx.Send(ctx, &query.Result{ Err: err, @@ -602,7 +602,7 @@ func (e *StatementExecutor) executeShowTagValues(ctx context.Context, q *influxq } } - tagValues, err := e.TSDBStore.TagValues(ectx.Authorizer, shardIDs, cond) + tagValues, err := e.TSDBStore.TagValues(ctx, ectx.Authorizer, shardIDs, cond) if err != nil { return ectx.Send(ctx, &query.Result{Err: err}) } @@ -759,9 +759,9 @@ func (m mappings) DefaultRetentionPolicy(db string) string { type TSDBStore interface { DeleteMeasurement(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error - MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) - TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) } var _ TSDBStore = LocalTSDBStore{} diff --git a/v1/coordinator/statement_executor_test.go b/v1/coordinator/statement_executor_test.go index bb10f8e83e..3dc532c96b 100644 --- a/v1/coordinator/statement_executor_test.go +++ b/v1/coordinator/statement_executor_test.go @@ -5,12 +5,13 @@ import ( "context" "errors" "fmt" - "github.com/influxdata/influxdb/v2/kit/platform" "reflect" "regexp" "testing" "time" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/davecgh/go-spew/spew" "github.com/golang/mock/gomock" "github.com/influxdata/influxdb/v2" @@ -413,11 +414,11 @@ func NewQueryExecutor(t *testing.T, opts ...optFn) *QueryExecutor { return nil } - e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { + e.TSDBStore.MeasurementNamesFn = func(_ context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { return nil, nil } - e.TSDBStore.TagValuesFn = func(_ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) { + e.TSDBStore.TagValuesFn = func(_ context.Context, _ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) { return nil, nil } diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index 24a454703b..ac0fc7f704 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -28,11 +28,11 @@ var ( ) type TSDBStore interface { - MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) ShardGroup(ids []uint64) tsdb.ShardGroup Shards(ids []uint64) []*tsdb.Shard - TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) } type MetaClient interface { @@ -336,7 +336,7 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - keys, err := s.TSDBStore.TagKeys(auth, shardIDs, expr) + keys, err := s.TSDBStore.TagKeys(ctx, auth, shardIDs, expr) if err != nil { return cursors.EmptyStringIterator, err } @@ -459,7 +459,7 @@ func (s *Store) tagValues(ctx context.Context, mqAttrs *metaqueryAttributes, tag // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - values, err := s.TSDBStore.TagValues(auth, shardIDs, mqAttrs.pred) + values, err := s.TSDBStore.TagValues(ctx, auth, shardIDs, mqAttrs.pred) if err != nil { return nil, err } @@ -491,7 +491,7 @@ func (s *Store) MeasurementNames(ctx context.Context, mqAttrs *metaqueryAttribut // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - values, err := s.TSDBStore.MeasurementNames(auth, mqAttrs.db, mqAttrs.pred) + values, err := s.TSDBStore.MeasurementNames(ctx, auth, mqAttrs.db, mqAttrs.pred) if err != nil { return nil, err }