Fix database and tag limits.

pull/8224/head
Ben Johnson 2017-03-24 09:48:10 -06:00
parent 631681796d
commit 9fb8f1ec1d
No known key found for this signature in database
GPG Key ID: 81741CD251883081
9 changed files with 96 additions and 41 deletions

View File

@ -41,6 +41,10 @@ const (
// block in a TSM file
DefaultMaxPointsPerBlock = 1000
// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
// This limit only applies to the "inmem" index.
DefaultMaxSeriesPerDatabase = 100000
// DefaultMaxSeriesPerShard is the maximum number of series a node can hold per shard.
DefaultMaxSeriesPerShard = 100000
@ -73,6 +77,11 @@ type Config struct {
// Limits
// MaxSeriesPerDatabase is the maximum number of series a node can hold per database.
// When this limit is exceeded, writes return a 'max series per database exceeded' error.
// A value of 0 disables the limit. This limit only applies when using the "inmem" index.
MaxSeriesPerDatabase int `toml:"max-series-per-shard"`
// MaxSeriesPerShard is the maximum number of series a node can hold per shard.
// When this limit is exceeded, writes return a 'max series per shard exceeded' error.
// A value of 0 disables the limit.
@ -99,6 +108,7 @@ func NewConfig() Config {
CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration),
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxSeriesPerShard: DefaultMaxSeriesPerShard,
MaxValuesPerTag: DefaultMaxValuesPerTag,
@ -149,6 +159,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
"max-series-per-database": c.MaxSeriesPerDatabase,
"max-series-per-shard": c.MaxSeriesPerShard,
"max-values-per-tag": c.MaxValuesPerTag,
}), nil

View File

@ -60,6 +60,7 @@ type Engine interface {
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)

View File

@ -327,6 +327,10 @@ func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error
return e.index.ForEachMeasurementTagKey(name, fn)
}
func (e *Engine) TagKeyCardinality(name, key []byte) int {
return e.index.TagKeyCardinality(name, key)
}
// SeriesN returns the unique number of series in the index.
func (e *Engine) SeriesN() int64 {
return e.index.SeriesN()

View File

@ -35,6 +35,7 @@ type Index interface {
TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL system iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)

View File

@ -155,6 +155,12 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m
return nil
}
// Verify that the series will not exceed limit.
if max := opt.Config.MaxSeriesPerDatabase; max > 0 && len(i.series)+1 > max {
i.mu.Unlock()
return errMaxSeriesPerDatabaseExceeded
}
// set the in memory ID for query processing on this shard
// The series key and tags are clone to prevent a memory leak
series := tsdb.NewSeries([]byte(string(key)), tags.Clone())
@ -272,6 +278,18 @@ func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error)
return nil
}
// TagKeyCardinality returns the number of values for a measurement/tag key.
func (i *Index) TagKeyCardinality(name, key []byte) int {
i.mu.RLock()
defer i.mu.RUnlock()
mm := i.measurements[string(name)]
if mm == nil {
return 0
}
return mm.CardinalityBytes(key)
}
// TagsForSeries returns the tag map for the passed in series
func (i *Index) TagsForSeries(key string) (models.Tags, error) {
i.mu.RLock()
@ -704,6 +722,7 @@ type ShardIndex struct {
func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error {
var reason string
var dropped int
var droppedKeys map[string]struct{}
// Ensure that no tags go over the maximum cardinality.
if maxValuesPerTag := idx.opt.Config.MaxValuesPerTag; maxValuesPerTag > 0 {
@ -726,7 +745,12 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
dropped++
reason = fmt.Sprintf("max-values-per-tag limit exceeded (%d/%d): measurement=%q tag=%q value=%q",
n, maxValuesPerTag, name, string(tag.Key), string(tag.Key))
n, maxValuesPerTag, name, string(tag.Key), string(tag.Value))
if droppedKeys == nil {
droppedKeys = make(map[string]struct{})
}
droppedKeys[string(keys[i])] = struct{}{}
continue outer
}
@ -741,7 +765,15 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
// Write
for i := range keys {
if err := idx.CreateSeriesIfNotExists(keys[i], names[i], tagsSlice[i]); err != nil {
if err := idx.CreateSeriesIfNotExists(keys[i], names[i], tagsSlice[i]); err == errMaxSeriesPerDatabaseExceeded {
dropped++
reason = fmt.Sprintf("max-series-per-database limit exceeded: (%d)", idx.opt.Config.MaxSeriesPerDatabase)
if droppedKeys == nil {
droppedKeys = make(map[string]struct{})
}
droppedKeys[string(keys[i])] = struct{}{}
continue
} else if err != nil {
return err
}
}
@ -749,8 +781,9 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
// Report partial writes back to shard.
if dropped > 0 {
return &tsdb.PartialWriteError{
Dropped: dropped,
Reason: reason,
Dropped: dropped,
DroppedKeys: droppedKeys,
}
}
@ -847,3 +880,7 @@ func (itr *seriesPointIterator) nextKeys() error {
return nil
}
}
// errMaxSeriesPerDatabaseExceeded is a marker error returned during series creation
// to indicate that a new series would exceed the limits of the database.
var errMaxSeriesPerDatabaseExceeded = errors.New("max series per database exceeded")

View File

@ -623,6 +623,12 @@ func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error)
return nil
}
// TagKeyCardinality always returns zero.
// It is not possible to determine cardinality of tags across index files.
func (i *Index) TagKeyCardinality(name, key []byte) int {
return 0
}
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
fs := i.RetainFileSet()

View File

@ -93,6 +93,9 @@ func (e ShardError) Error() string {
type PartialWriteError struct {
Reason string
Dropped int
// The set of series keys that were dropped. Can be nil.
DroppedKeys map[string]struct{}
}
func (e PartialWriteError) Error() string {
@ -293,10 +296,6 @@ func (s *Shard) Open() error {
}
s.engine = e
// TODO(benbjohnson):
// count := s.index.SeriesShardN(s.id)
// atomic.AddInt64(&s.stats.SeriesCreated, int64(count))
go s.monitor()
return nil
@ -553,11 +552,13 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
}
// Add new series. Check for partial writes.
var droppedKeys map[string]struct{}
if err := s.engine.CreateSeriesListIfNotExists(keys, names, tagsSlice); err != nil {
switch err := err.(type) {
case *PartialWriteError:
reason = err.Reason
dropped += err.Dropped
droppedKeys = err.DroppedKeys
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return nil, nil, err
@ -585,17 +586,13 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
iter.Reset()
// TODO(edd): we need to use the fast-series-check to determine if
// adding the series would send us over the limit for the shard.
//
// Replace false with the predicate that determines if p.Key() already
// exists in the shard.
if s.options.Config.MaxSeriesPerShard > 0 && false && s.engine.SeriesN()+1 > int64(s.options.Config.MaxSeriesPerShard) {
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
dropped++
reason = fmt.Sprintf("max-series-per-shard limit (%d) will be exceeded: db=%s, shard=%d", s.options.Config.MaxSeriesPerShard, s.database, s.id)
// Skip points if keys have been dropped.
// The drop count has already been incremented during series creation.
if droppedKeys != nil {
if _, ok := droppedKeys[string(keys[i])]; ok {
continue
}
}
// see if the field definitions need to be saved to the shard
mf := s.engine.MeasurementFields(p.Name())
@ -992,9 +989,7 @@ func (s *Shard) monitor() {
for _, name := range names {
s.engine.ForEachMeasurementTagKey(name, func(k []byte) error {
// TODO(benbjohnson): Add sketches for cardinality.
/*
n := s.engine.Cardinality(k)
n := s.engine.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
@ -1003,9 +998,8 @@ func (s *Shard) monitor() {
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k))
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k))
}
*/
return nil
})
}

View File

@ -92,9 +92,6 @@ func TestShardWriteAndIndex(t *testing.T) {
}
func TestMaxSeriesLimit(t *testing.T) {
t.Skip("TODO(edd): AWAITING SERIES CHECK FUNCTIONALITY")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "db", "rp", "1")
@ -102,7 +99,8 @@ func TestMaxSeriesLimit(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxSeriesPerShard = 1000
opts.Config.MaxSeriesPerDatabase = 1000
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
@ -139,7 +137,7 @@ func TestMaxSeriesLimit(t *testing.T) {
err = sh.WritePoints([]models.Point{pt})
if err == nil {
t.Fatal("expected error")
} else if exp, got := `db=db: max-series-per-database limit exceeded: (1000/1000) dropped=1`, err.Error(); exp != got {
} else if exp, got := `max-series-per-database limit exceeded: (1000) dropped=1`, err.Error(); exp != got {
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
}
@ -147,9 +145,6 @@ func TestMaxSeriesLimit(t *testing.T) {
}
func TestShard_MaxTagValuesLimit(t *testing.T) {
t.Skip("TODO(edd): not performant enough yet")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "db", "rp", "1")

View File

@ -558,6 +558,7 @@ func TestStore_Cardinality_Unique_Inmem(t *testing.T) {
store := NewStore()
store.EngineOptions.Config.Index = "inmem"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}
@ -570,6 +571,7 @@ func TestStore_Cardinality_Unique_TSI1(t *testing.T) {
store := NewStore()
store.EngineOptions.Config.Index = "tsi1"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}
@ -646,6 +648,7 @@ func TestStore_Cardinality_Duplicates_Inmem(t *testing.T) {
store := NewStore()
store.EngineOptions.Config.Index = "inmem"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}
@ -658,6 +661,7 @@ func TestStore_Cardinality_Duplicates_TSI1(t *testing.T) {
store := NewStore()
store.EngineOptions.Config.Index = "tsi1"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}
@ -720,6 +724,7 @@ func TestStore_Cardinality_Compactions_Inmem(t *testing.T) {
store := NewStore()
store.EngineOptions.Config.Index = "inmem"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}
@ -732,6 +737,7 @@ func TestStore_Cardinality_Compactions_TSI1(t *testing.T) {
store := NewStore()
store.EngineOptions.Config.Index = "tsi1"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
}