Merge pull request #9494 from influxdata/jw-tagsets-contains
Skip creating cursors for series not in a shardpull/9496/head
commit
4eaae6111e
|
|
@ -772,7 +772,7 @@ func (i *Index) DropSeriesGlobal(key []byte, ts int64) error {
|
|||
}
|
||||
|
||||
// TagSets returns a list of tag sets.
|
||||
func (i *Index) TagSets(shardID uint64, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
func (i *Index) TagSets(shardSeriesIDs *tsdb.SeriesIDSet, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
|
|
@ -781,7 +781,7 @@ func (i *Index) TagSets(shardID uint64, name []byte, opt query.IteratorOptions)
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
tagSets, err := mm.TagSets(shardID, opt)
|
||||
tagSets, err := mm.TagSets(shardSeriesIDs, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -1168,7 +1168,7 @@ func (idx *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tag
|
|||
|
||||
// TagSets returns a list of tag sets based on series filtering.
|
||||
func (idx *ShardIndex) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
return idx.Index.TagSets(idx.id, name, opt)
|
||||
return idx.Index.TagSets(idx.seriesIDSet, name, opt)
|
||||
}
|
||||
|
||||
// SeriesIDSet returns the bitset associated with the series ids.
|
||||
|
|
|
|||
|
|
@ -365,7 +365,7 @@ func (m *measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags
|
|||
// This will also populate the TagSet objects with the series IDs that match each tagset and any
|
||||
// influx filter expression that goes with the series
|
||||
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
|
||||
func (m *measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
func (m *measurement) TagSets(shardSeriesIDs *tsdb.SeriesIDSet, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
// get the unique set of series ids and the filters that should be applied to each
|
||||
ids, filters, err := m.filters(opt.Condition)
|
||||
if err != nil {
|
||||
|
|
@ -400,7 +400,7 @@ func (m *measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que
|
|||
}
|
||||
|
||||
s := m.seriesByID[id]
|
||||
if s == nil || s.Deleted() {
|
||||
if s == nil || s.Deleted() || !shardSeriesIDs.Contains(id) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
|
|
@ -135,7 +136,9 @@ func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
|
|||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
m.TagSets(1, query.IteratorOptions{})
|
||||
s := tsdb.NewSeriesIDSet()
|
||||
s.Add(1)
|
||||
m.TagSets(s, query.IteratorOptions{})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
|
@ -204,19 +207,22 @@ func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
|
|||
|
||||
func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) {
|
||||
m := newMeasurement("foo", "m")
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
|
||||
s := newSeries(uint64(i), m, fmt.Sprintf("m,tag1=value1,tag2=value2"), models.NewTags(tags))
|
||||
ss.Add(uint64(i))
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
// warm caches
|
||||
m.TagSets(0, opt)
|
||||
m.TagSets(ss, opt)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
m.TagSets(0, opt)
|
||||
m.TagSets(ss, opt)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue