From 3681bc8a43fd9cc82e074d7e2d293d66166fa792 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 20 Oct 2016 14:15:31 -0500 Subject: [PATCH] Filter out series within shards that do not have data for that series Previously, we would return a full tag set for every shard and the tag set would include all series that existed in the database index including series that didn't physically exist within that shard. This led to the tag sets returned being incredibly huge when we had high cardinality but sparse data. Since the data was sparse, it was unexpected that it would cause such a large strain on the system by most people. Now we filter out the series ids that are not assigned to the current shard when computing a tag set for that shard. This lowers the memory usage for high cardinality sparse data drastically and allows queries on those to complete successfully. This does not resolve issues for high cardinality data in every shard that is also spread out over a long series of time. That situation isn't nearly as common as the above situation though. --- CHANGELOG.md | 1 + tsdb/engine.go | 9 +++++---- tsdb/engine/tsm1/engine.go | 6 ++++-- tsdb/engine/tsm1/engine_test.go | 35 ++++++++++++++++++++++----------- tsdb/meta.go | 5 ++++- tsdb/shard.go | 2 +- 6 files changed, 39 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c39aff9132..d8d878343e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent. - [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time. - [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors. +- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series. ### Bugfixes diff --git a/tsdb/engine.go b/tsdb/engine.go index 70fc9bd51c..b4fe3cfa8a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -62,7 +62,7 @@ const ( ) // NewEngineFunc creates a new engine. -type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine +type NewEngineFunc func(id uint64, path string, walPath string, options EngineOptions) Engine // newEngineFuncs is a lookup of engine constructors by name. var newEngineFuncs = make(map[string]NewEngineFunc) @@ -87,10 +87,10 @@ func RegisteredEngines() []string { // NewEngine returns an instance of an engine based on its format. // If the path does not exist then the DefaultFormat is used. -func NewEngine(path string, walPath string, options EngineOptions) (Engine, error) { +func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error) { // Create a new engine if _, err := os.Stat(path); os.IsNotExist(err) { - return newEngineFuncs[options.EngineVersion](path, walPath, options), nil + return newEngineFuncs[options.EngineVersion](id, path, walPath, options), nil } // If it's a dir then it's a tsm1 engine @@ -109,12 +109,13 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro return nil, fmt.Errorf("invalid engine format: %q", format) } - return fn(path, walPath, options), nil + return fn(id, path, walPath, options), nil } // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { EngineVersion string + ShardID uint64 Config Config } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index ad8d9ab69c..248ba35f2b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -91,6 +91,7 @@ type Engine struct { snapDone chan struct{} // channel to signal snapshot compactions to stop snapWG sync.WaitGroup // waitgroup for running snapshot compactions + id uint64 path string logger *log.Logger // Logger to be used for important messages traceLogger *log.Logger // Logger to be used when trace-logging is on. @@ -125,7 +126,7 @@ type Engine struct { } // NewEngine returns a new instance of Engine. -func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine { +func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine { w := NewWAL(walPath) fs := NewFileStore(path) cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path) @@ -136,6 +137,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine } e := &Engine{ + id: id, path: path, logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags), traceLogger: log.New(ioutil.Discard, "[tsm1] ", log.LstdFlags), @@ -1268,7 +1270,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo for _, mm := range mms { // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition) + tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition) if err != nil { return err } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 7bd6b8e620..87dd6b21f3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -146,7 +146,7 @@ func TestEngine_Backup(t *testing.T) { p3 := MustParsePointString("cpu,host=C value=1.3 3000000000") // Write those points to the engine. - e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) + e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} @@ -222,7 +222,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) + if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -275,7 +277,9 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) + if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -328,7 +332,9 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) + if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -382,7 +388,9 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) + if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -437,7 +445,9 @@ func TestEngine_CreateIterator_Aux(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) + if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A F=100 1000000000`, @@ -497,7 +507,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) + if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A X=10 1000000000`, @@ -560,7 +572,7 @@ func TestEngine_DeleteSeries(t *testing.T) { p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") // Write those points to the engine. - e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) + e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} @@ -721,7 +733,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine { // Initialize metadata. e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) + si.AssignShard(1) // Generate time ascending points with jitterred time & value. rand := rand.New(rand.NewSource(0)) @@ -770,7 +783,7 @@ func NewEngine() *Engine { panic(err) } return &Engine{ - Engine: tsm1.NewEngine( + Engine: tsm1.NewEngine(1, filepath.Join(root, "data"), filepath.Join(root, "wal"), tsdb.NewEngineOptions()).(*tsm1.Engine), @@ -802,7 +815,7 @@ func (e *Engine) Reopen() error { return err } - e.Engine = tsm1.NewEngine( + e.Engine = tsm1.NewEngine(1, filepath.Join(e.root, "data"), filepath.Join(e.root, "wal"), tsdb.NewEngineOptions()).(*tsm1.Engine) diff --git a/tsdb/meta.go b/tsdb/meta.go index f90103fb82..13a6da3d31 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -771,7 +771,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf // This will also populate the TagSet objects with the series IDs that match each tagset and any // influx filter expression that goes with the series // TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { +func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { m.mu.RLock() // get the unique set of series ids and the filters that should be applied to each @@ -787,6 +787,9 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]* tagSets := make(map[string]*influxql.TagSet, 64) for _, id := range ids { s := m.seriesByID[id] + if !s.Assigned(shardID) { + continue + } tags := make(map[string]string, len(dimensions)) // Build the TagSet for this series. diff --git a/tsdb/shard.go b/tsdb/shard.go index 1e7306d22d..b301bc3155 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -240,7 +240,7 @@ func (s *Shard) Open() error { } // Initialize underlying engine. - e, err := NewEngine(s.path, s.walPath, s.options) + e, err := NewEngine(s.id, s.path, s.walPath, s.options) if err != nil { return err }