diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 34c68db9c0..ee6fa7a9b3 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -772,7 +772,7 @@ func (i *Index) DropSeriesGlobal(key []byte, ts int64) error { } // TagSets returns a list of tag sets. -func (i *Index) TagSets(shardID uint64, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { +func (i *Index) TagSets(shardSeriesIDs *tsdb.SeriesIDSet, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { i.mu.RLock() defer i.mu.RUnlock() @@ -781,7 +781,7 @@ func (i *Index) TagSets(shardID uint64, name []byte, opt query.IteratorOptions) return nil, nil } - tagSets, err := mm.TagSets(shardID, opt) + tagSets, err := mm.TagSets(shardSeriesIDs, opt) if err != nil { return nil, err } @@ -1168,7 +1168,7 @@ func (idx *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tag // TagSets returns a list of tag sets based on series filtering. func (idx *ShardIndex) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { - return idx.Index.TagSets(idx.id, name, opt) + return idx.Index.TagSets(idx.seriesIDSet, name, opt) } // SeriesIDSet returns the bitset associated with the series ids. diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 747d2bfee8..decc0f7105 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -365,7 +365,7 @@ func (m *measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags // This will also populate the TagSet objects with the series IDs that match each tagset and any // influx filter expression that goes with the series // TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*query.TagSet, error) { +func (m *measurement) TagSets(shardSeriesIDs *tsdb.SeriesIDSet, opt query.IteratorOptions) ([]*query.TagSet, error) { // get the unique set of series ids and the filters that should be applied to each ids, filters, err := m.filters(opt.Condition) if err != nil { @@ -400,7 +400,7 @@ func (m *measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que } s := m.seriesByID[id] - if s == nil || s.Deleted() { + if s == nil || s.Deleted() || !shardSeriesIDs.Contains(id) { continue } diff --git a/tsdb/index/inmem/meta_test.go b/tsdb/index/inmem/meta_test.go index 32b7edebc2..48424c4464 100644 --- a/tsdb/index/inmem/meta_test.go +++ b/tsdb/index/inmem/meta_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" ) @@ -135,7 +136,9 @@ func TestMeasurement_TagsSet_Deadlock(t *testing.T) { m.DropSeries(s1) // This was deadlocking - m.TagSets(1, query.IteratorOptions{}) + s := tsdb.NewSeriesIDSet() + s.Add(1) + m.TagSets(s, query.IteratorOptions{}) if got, exp := len(m.SeriesIDs()), 1; got != exp { t.Fatalf("series count mismatch: got %v, exp %v", got, exp) } @@ -204,19 +207,22 @@ func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) { func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) { m := newMeasurement("foo", "m") + ss := tsdb.NewSeriesIDSet() + for i := 0; i < n; i++ { tags := map[string]string{"tag1": "value1", "tag2": "value2"} s := newSeries(uint64(i), m, fmt.Sprintf("m,tag1=value1,tag2=value2"), models.NewTags(tags)) + ss.Add(uint64(i)) m.AddSeries(s) } // warm caches - m.TagSets(0, opt) + m.TagSets(ss, opt) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - m.TagSets(0, opt) + m.TagSets(ss, opt) } }