Filter out series within shards that do not have data for that series

Previously, we would return a full tag set for every shard and the tag
set would include all series that existed in the database index
including series that didn't physically exist within that shard. This
led to the tag sets returned being incredibly huge when we had high
cardinality but sparse data. Since the data was sparse, it was
unexpected that it would cause such a large strain on the system by most
people.

Now we filter out the series ids that are not assigned to the current
shard when computing a tag set for that shard. This lowers the memory
usage for high cardinality sparse data drastically and allows queries on
those to complete successfully.

This does not resolve issues for high cardinality data in every shard
that is also spread out over a long series of time. That situation isn't
nearly as common as the above situation though.
pull/7496/head
Jonathan A. Sternberg 2016-10-20 14:15:31 -05:00
parent 6fd74a6a2a
commit 3681bc8a43
6 changed files with 39 additions and 19 deletions

View File

@ -30,6 +30,7 @@
- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent.
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.
- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors.
- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series.
### Bugfixes

View File

@ -62,7 +62,7 @@ const (
)
// NewEngineFunc creates a new engine.
type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine
type NewEngineFunc func(id uint64, path string, walPath string, options EngineOptions) Engine
// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
@ -87,10 +87,10 @@ func RegisteredEngines() []string {
// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewEngine(path string, walPath string, options EngineOptions) (Engine, error) {
func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[options.EngineVersion](path, walPath, options), nil
return newEngineFuncs[options.EngineVersion](id, path, walPath, options), nil
}
// If it's a dir then it's a tsm1 engine
@ -109,12 +109,13 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
return nil, fmt.Errorf("invalid engine format: %q", format)
}
return fn(path, walPath, options), nil
return fn(id, path, walPath, options), nil
}
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
ShardID uint64
Config Config
}

View File

@ -91,6 +91,7 @@ type Engine struct {
snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG sync.WaitGroup // waitgroup for running snapshot compactions
id uint64
path string
logger *log.Logger // Logger to be used for important messages
traceLogger *log.Logger // Logger to be used when trace-logging is on.
@ -125,7 +126,7 @@ type Engine struct {
}
// NewEngine returns a new instance of Engine.
func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath)
fs := NewFileStore(path)
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)
@ -136,6 +137,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
}
e := &Engine{
id: id,
path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
traceLogger: log.New(ioutil.Discard, "[tsm1] ", log.LstdFlags),
@ -1268,7 +1270,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo
for _, mm := range mms {
// Determine tagsets for this measurement based on dimensions and filters.
tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition)
tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition)
if err != nil {
return err
}

View File

@ -146,7 +146,7 @@ func TestEngine_Backup(t *testing.T) {
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -222,7 +222,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -275,7 +277,9 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -328,7 +332,9 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -382,7 +388,9 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -437,7 +445,9 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
@ -497,7 +507,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
@ -560,7 +572,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -721,7 +733,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
// Initialize metadata.
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))
@ -770,7 +783,7 @@ func NewEngine() *Engine {
panic(err)
}
return &Engine{
Engine: tsm1.NewEngine(
Engine: tsm1.NewEngine(1,
filepath.Join(root, "data"),
filepath.Join(root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine),
@ -802,7 +815,7 @@ func (e *Engine) Reopen() error {
return err
}
e.Engine = tsm1.NewEngine(
e.Engine = tsm1.NewEngine(1,
filepath.Join(e.root, "data"),
filepath.Join(e.root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine)

View File

@ -771,7 +771,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
m.mu.RLock()
// get the unique set of series ids and the filters that should be applied to each
@ -787,6 +787,9 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
tagSets := make(map[string]*influxql.TagSet, 64)
for _, id := range ids {
s := m.seriesByID[id]
if !s.Assigned(shardID) {
continue
}
tags := make(map[string]string, len(dimensions))
// Build the TagSet for this series.

View File

@ -240,7 +240,7 @@ func (s *Shard) Open() error {
}
// Initialize underlying engine.
e, err := NewEngine(s.path, s.walPath, s.options)
e, err := NewEngine(s.id, s.path, s.walPath, s.options)
if err != nil {
return err
}