Add tag value cardinality limit

pull/7430/head
Jason Wilder 2016-10-04 23:52:49 -06:00
parent 89c7572dd6
commit 8fce6bba48
5 changed files with 109 additions and 4 deletions

View File

@ -2,6 +2,10 @@
### Release Notes
### Breaking changes
* `max-values-per-tag` was added with a default of 100,000, but can be disabled by setting it to `0`. Existing measurements with tags that exceed this limit will continue to load, but writes that would cause the tags cardinality to increase will return an error. This limit can be used to prevent high cardinality tag values from being written to a measurement.
### Features
- [#7415](https://github.com/influxdata/influxdb/pull/7415): Add sample function to query language.
@ -18,6 +22,7 @@
- [#7115](https://github.com/influxdata/influxdb/issues/7115): Feature request: `influx inspect -export` should dump WAL files.
- [#7388](https://github.com/influxdata/influxdb/pull/7388): Implement cumulative_sum() function.
- [#7441](https://github.com/influxdata/influxdb/pull/7441): Speed up shutdown by closing shards concurrently.
- [#7146](https://github.com/influxdata/influxdb/issues/7146): Add max-values-per-tag to limit high tag cardinality data
### Bugfixes

View File

@ -149,6 +149,9 @@ type point struct {
// cached version of parsed name from key
cachedName string
// cached version of parsed tags
cachedTags Tags
it fieldIterator
}
@ -1279,7 +1282,11 @@ func (p *point) Round(d time.Duration) {
// Tags returns the tag set for the point
func (p *point) Tags() Tags {
return parseTags(p.key)
if p.cachedTags != nil {
return p.cachedTags
}
p.cachedTags = parseTags(p.key)
return p.cachedTags
}
func parseTags(buf []byte) Tags {
@ -1332,6 +1339,7 @@ func MakeKey(name []byte, tags Tags) []byte {
// SetTags replaces the tags for the point
func (p *point) SetTags(tags Tags) {
p.key = MakeKey([]byte(p.Name()), tags)
p.cachedTags = tags
}
// AddTag adds or replaces a tag value for a point
@ -1339,6 +1347,7 @@ func (p *point) AddTag(key, value string) {
tags := p.Tags()
tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)})
sort.Sort(tags)
p.cachedTags = tags
p.key = MakeKey([]byte(p.Name()), tags)
}

View File

@ -39,6 +39,9 @@ const (
// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
DefaultMaxSeriesPerDatabase = 1000000
// DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement.
DefaultMaxValuesPerTag = 100000
)
// Config holds the configuration for the tsbd package.
@ -67,6 +70,11 @@ type Config struct {
// A value of 0 disables the limit.
MaxSeriesPerDatabase int `toml:"max-series-per-database"`
// MaxValuesPerTag is the maximum number of tag values a single tag key can have within
// a measurement. When the limit is execeeded, writes return an error.
// A value of 0 disables the limit.
MaxValuesPerTag int `toml:"max-values-per-tag"`
TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
}
@ -85,6 +93,7 @@ func NewConfig() Config {
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
TraceLoggingEnabled: false,
}

View File

@ -635,6 +635,17 @@ func (m *Measurement) HasTagKey(k string) bool {
return hasTag
}
func (m *Measurement) HasTagKeyValue(k, v []byte) bool {
m.mu.RLock()
if vals, ok := m.seriesByTagKeyValue[string(k)]; ok {
_, ok := vals[string(v)]
m.mu.RUnlock()
return ok
}
m.mu.RUnlock()
return false
}
// HasSeries returns true if there is at least 1 series under this measurement
func (m *Measurement) HasSeries() bool {
m.mu.RLock()
@ -642,6 +653,15 @@ func (m *Measurement) HasSeries() bool {
return len(m.seriesByID) > 0
}
// Cardinality returns the number of values associated with tag key
func (m *Measurement) Cardinality(key []byte) int {
var n int
m.mu.RLock()
n = len(m.seriesByTagKeyValue[string(key)])
m.mu.RUnlock()
return n
}
// AddSeries will add a series to the measurementIndex. Returns false if already present
func (m *Measurement) AddSeries(s *Series) bool {
m.mu.RLock()
@ -1810,15 +1830,26 @@ func MarshalTags(tags map[string]string) []byte {
// TagKeys returns a list of the measurement's tag names.
func (m *Measurement) TagKeys() []string {
m.mu.RLock()
defer m.mu.RUnlock()
keys := make([]string, 0, len(m.seriesByTagKeyValue))
for k := range m.seriesByTagKeyValue {
keys = append(keys, k)
}
m.mu.RUnlock()
sort.Strings(keys)
return keys
}
func (m *Measurement) TagKeysBytes() [][]byte {
m.mu.RLock()
keys := make([][]byte, 0, len(m.seriesByTagKeyValue))
for k := range m.seriesByTagKeyValue {
keys = append(keys, []byte(k))
}
m.mu.RUnlock()
sort.Sort(bytesSlice(keys))
return keys
}
// TagValues returns all the values for the given tag key
func (m *Measurement) TagValues(key string) []string {
m.mu.RLock()
@ -1981,3 +2012,9 @@ type byTagKey []*influxql.TagSet
func (t byTagKey) Len() int { return len(t) }
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
type bytesSlice [][]byte
func (t bytesSlice) Len() int { return len(t) }
func (t bytesSlice) Less(i, j int) bool { return bytes.Compare(t[i], t[j]) < 0 }
func (t bytesSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

View File

@ -255,7 +255,7 @@ func (s *Shard) Open() error {
s.logger.Printf("%s database index loaded in %s", s.path, time.Now().Sub(start))
go s.monitorSize()
go s.monitor()
return nil
}(); err != nil {
@ -469,6 +469,29 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate, error) {
var fieldsToCreate []*FieldCreate
if s.options.Config.MaxValuesPerTag > 0 {
for _, p := range points {
tags := p.Tags()
// Tag value cardinality limit
m := s.index.Measurement(p.Name())
// Measurement doesn't exist yet, can't check the limit
if m != nil {
for _, tag := range tags {
// If the tag value already exists, skip the limit check
if m.HasTagKeyValue(tag.Key, tag.Value) {
continue
}
n := m.Cardinality(tag.Key)
if n >= s.options.Config.MaxValuesPerTag {
return nil, fmt.Errorf("max values per tag exceeded: %s, %v=%v: %d, limit %d",
m.Name, string(tag.Key), string(tag.Value), n, s.options.Config.MaxValuesPerTag)
}
}
}
}
}
// get the shard mutex for locally defined fields
for _, p := range points {
// verify the tags and fields
@ -756,9 +779,11 @@ func (s *Shard) CreateSnapshot() (string, error) {
return s.engine.CreateSnapshot()
}
func (s *Shard) monitorSize() {
func (s *Shard) monitor() {
t := time.NewTicker(monitorStatInterval)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
for {
select {
case <-s.closing:
@ -770,6 +795,26 @@ func (s *Shard) monitorSize() {
continue
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
case <-t2.C:
if s.options.Config.MaxValuesPerTag == 0 {
continue
}
for _, m := range s.index.Measurements() {
for _, k := range m.TagKeysBytes() {
n := m.Cardinality(k)
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Printf("WARN: %d%% of tag values limit reached: (%d/%d), db=%s measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, m.Name, k)
}
}
}
}
}
}