Merge pull request #7312 from influxdata/jw-map-allocate
Reduce allocations in idsForExprpull/7345/head
commit
b3dd10ec20
23
tsdb/meta.go
23
tsdb/meta.go
|
@ -166,7 +166,7 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the in memory ID for query processing on this shard
|
// set the in memory ID for query processing on this shard
|
||||||
series.id = d.lastID + 1
|
series.ID = d.lastID + 1
|
||||||
d.lastID++
|
d.lastID++
|
||||||
|
|
||||||
series.measurement = m
|
series.measurement = m
|
||||||
|
@ -644,7 +644,7 @@ func (m *Measurement) HasSeries() bool {
|
||||||
// AddSeries will add a series to the measurementIndex. Returns false if already present
|
// AddSeries will add a series to the measurementIndex. Returns false if already present
|
||||||
func (m *Measurement) AddSeries(s *Series) bool {
|
func (m *Measurement) AddSeries(s *Series) bool {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
if _, ok := m.seriesByID[s.id]; ok {
|
if _, ok := m.seriesByID[s.ID]; ok {
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -653,12 +653,12 @@ func (m *Measurement) AddSeries(s *Series) bool {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
if _, ok := m.seriesByID[s.id]; ok {
|
if _, ok := m.seriesByID[s.ID]; ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
m.seriesByID[s.id] = s
|
m.seriesByID[s.ID] = s
|
||||||
m.seriesIDs = append(m.seriesIDs, s.id)
|
m.seriesIDs = append(m.seriesIDs, s.ID)
|
||||||
|
|
||||||
// the series ID should always be higher than all others because it's a new
|
// the series ID should always be higher than all others because it's a new
|
||||||
// series. So don't do the sort if we don't have to.
|
// series. So don't do the sort if we don't have to.
|
||||||
|
@ -674,7 +674,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
|
||||||
m.seriesByTagKeyValue[string(t.Key)] = valueMap
|
m.seriesByTagKeyValue[string(t.Key)] = valueMap
|
||||||
}
|
}
|
||||||
ids := valueMap[string(t.Value)]
|
ids := valueMap[string(t.Value)]
|
||||||
ids = append(ids, s.id)
|
ids = append(ids, s.ID)
|
||||||
|
|
||||||
// most of the time the series ID will be higher than all others because it's a new
|
// most of the time the series ID will be higher than all others because it's a new
|
||||||
// series. So don't do the sort if we don't have to.
|
// series. So don't do the sort if we don't have to.
|
||||||
|
@ -689,7 +689,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
|
||||||
|
|
||||||
// DropSeries will remove a series from the measurementIndex.
|
// DropSeries will remove a series from the measurementIndex.
|
||||||
func (m *Measurement) DropSeries(series *Series) {
|
func (m *Measurement) DropSeries(series *Series) {
|
||||||
seriesID := series.id
|
seriesID := series.ID
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
@ -889,6 +889,11 @@ func mergeSeriesFilters(op influxql.Token, ids SeriesIDs, lfilters, rfilters Fil
|
||||||
return series, filters
|
return series, filters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs {
|
||||||
|
ids, _, _ := m.idsForExpr(n)
|
||||||
|
return ids
|
||||||
|
}
|
||||||
|
|
||||||
// idsForExpr will return a collection of series ids and a filter expression that should
|
// idsForExpr will return a collection of series ids and a filter expression that should
|
||||||
// be used to filter points from those series.
|
// be used to filter points from those series.
|
||||||
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) {
|
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) {
|
||||||
|
@ -1002,6 +1007,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
||||||
}
|
}
|
||||||
ids = seriesIDs.evict()
|
ids = seriesIDs.evict()
|
||||||
} else if empty && n.Op == influxql.NEQREGEX {
|
} else if empty && n.Op == influxql.NEQREGEX {
|
||||||
|
ids = make(SeriesIDs, 0, len(m.seriesIDs))
|
||||||
for k := range tagVals {
|
for k := range tagVals {
|
||||||
if !re.Val.MatchString(k) {
|
if !re.Val.MatchString(k) {
|
||||||
ids = append(ids, tagVals[k]...)
|
ids = append(ids, tagVals[k]...)
|
||||||
|
@ -1009,6 +1015,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
||||||
}
|
}
|
||||||
sort.Sort(ids)
|
sort.Sort(ids)
|
||||||
} else if !empty && n.Op == influxql.EQREGEX {
|
} else if !empty && n.Op == influxql.EQREGEX {
|
||||||
|
ids = make(SeriesIDs, 0, len(m.seriesIDs))
|
||||||
for k := range tagVals {
|
for k := range tagVals {
|
||||||
if re.Val.MatchString(k) {
|
if re.Val.MatchString(k) {
|
||||||
ids = append(ids, tagVals[k]...)
|
ids = append(ids, tagVals[k]...)
|
||||||
|
@ -1462,7 +1469,7 @@ type Series struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
Key string
|
Key string
|
||||||
Tags models.Tags
|
Tags models.Tags
|
||||||
id uint64
|
ID uint64
|
||||||
measurement *Measurement
|
measurement *Measurement
|
||||||
shardIDs []uint64 // shards that have this series defined
|
shardIDs []uint64 // shards that have this series defined
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package tsdb_test
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/influxql"
|
"github.com/influxdata/influxdb/influxql"
|
||||||
|
@ -92,6 +93,69 @@ func Test_SeriesIDs_Reject(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
|
||||||
|
m := tsdb.NewMeasurement("cpu")
|
||||||
|
for i := 0; i < 100000; i++ {
|
||||||
|
s := tsdb.NewSeries("cpu", models.Tags{models.Tag{
|
||||||
|
Key: []byte("host"),
|
||||||
|
Value: []byte(fmt.Sprintf("host%d", i))}})
|
||||||
|
s.ID = uint64(i)
|
||||||
|
m.AddSeries(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||||
|
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("invalid statement: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
selectStmt := stmt.(*influxql.SelectStatement)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||||
|
if exp, got := 100000, len(ids); exp != got {
|
||||||
|
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
|
||||||
|
m := tsdb.NewMeasurement("cpu")
|
||||||
|
for i := 0; i < 100000; i++ {
|
||||||
|
s := tsdb.NewSeries("cpu", models.Tags{models.Tag{
|
||||||
|
Key: []byte("host"),
|
||||||
|
Value: []byte(fmt.Sprintf("host%d", i))}})
|
||||||
|
s.ID = uint64(i)
|
||||||
|
m.AddSeries(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||||
|
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("invalid statement: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
selectStmt := stmt.(*influxql.SelectStatement)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||||
|
if exp, got := 100000, len(ids); exp != got {
|
||||||
|
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure tags can be marshaled into a byte slice.
|
// Ensure tags can be marshaled into a byte slice.
|
||||||
func TestMarshalTags(t *testing.T) {
|
func TestMarshalTags(t *testing.T) {
|
||||||
for i, tt := range []struct {
|
for i, tt := range []struct {
|
||||||
|
|
Loading…
Reference in New Issue