diff --git a/tsdb/index_test.go b/tsdb/index_test.go index c1468ba88d..a2478c7287 100644 --- a/tsdb/index_test.go +++ b/tsdb/index_test.go @@ -120,12 +120,6 @@ func (i *Index) MustOpen() { } } -func (idx *Index) AddSeries(name string, tags map[string]string, typ models.FieldType) error { - t := models.NewTags(tags) - key := fmt.Sprintf("%s,%s", name, t.HashKey()) - return idx.CreateSeriesIfNotExists([]byte(key), []byte(name), t, typ) -} - // Reopen closes and re-opens the underlying index, without removing any data. func (i *Index) Reopen() error { if err := i.Index.Close(); err != nil { diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index 4e8260611c..2f1d1805e2 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -666,51 +666,6 @@ func (i *Index) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) e return nil } -// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted. -// TODO(edd): This should go. -func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error { - collection := &tsdb.SeriesCollection{ - Keys: [][]byte{key}, - Names: [][]byte{name}, - Tags: []models.Tags{tags}, - Types: []models.FieldType{typ}, - } - err := i.sfile.CreateSeriesListIfNotExists(collection) - if err != nil { - return err - } - ids, err := i.partition(key).createSeriesListIfNotExists(collection) - if err != nil { - return err - } - - if len(ids) == 0 || ids[0].IsZero() { - return nil // No new series, nothing further to update. - } - - // If there are cached sets for any of the tag pairs, they will need to be - // updated with the series id. - i.tagValueCache.RLock() - if i.tagValueCache.measurementContainsSets(name) { - for _, pair := range tags { - // TODO(edd): It's not clear to me yet whether it will be better to take a lock - // on every series id set, or whether to gather them all up under the cache rlock - // and then take the cache lock and update them all at once (without invoking a lock - // on each series id set). - // - // Taking the cache lock will block all queries, but is one lock. Taking each series set - // lock might be many lock/unlocks but will only block a query that needs that particular set. - // - // Need to think on it, but I think taking a lock on each series id set is the way to go. - // - // Note this will only add `id` to the set if it exists. - i.tagValueCache.addToSet(name, pair.Key, pair.Value, ids[0]) // Takes a lock on the series id set - } - } - i.tagValueCache.RUnlock() - return nil -} - // InitializeSeries is a no-op. This only applies to the in-memory index. func (i *Index) InitializeSeries(*tsdb.SeriesCollection) error { return nil diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 678ba6227b..776df116fa 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -1107,10 +1107,6 @@ func (e *Engine) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) return e.index.CreateSeriesListIfNotExists(collection) } -func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error { - return e.index.CreateSeriesIfNotExists(key, name, tags, typ) -} - // WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. func (e *Engine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL diff --git a/tsdb/tsm1/engine_test.go b/tsdb/tsm1/engine_test.go index 5d2da0c45f..0a4fa42acb 100644 --- a/tsdb/tsm1/engine_test.go +++ b/tsdb/tsm1/engine_test.go @@ -177,15 +177,10 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { } defer e.Close() - for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { - if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil { - t.Fatalf("create series index error: %v", err) - } - } - - if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { + if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } + if err := e.WriteSnapshot(); err != nil { t.Fatalf("failed to snapshot: %s", err.Error()) } @@ -281,15 +276,10 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { } defer e.Close() - for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { - if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil { - t.Fatalf("create series index error: %v", err) - } - } - - if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { + if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } + if err := e.WriteSnapshot(); err != nil { t.Fatalf("failed to snapshot: %s", err.Error()) } @@ -401,15 +391,10 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) { } defer e.Close() - for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { - if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil { - t.Fatalf("create series index error: %v", err) - } - } - - if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { + if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } + if err := e.WriteSnapshot(); err != nil { t.Fatalf("failed to snapshot: %s", err.Error()) } @@ -483,15 +468,10 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) { } defer e.Close() - for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { - if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil { - t.Fatalf("create series index error: %v", err) - } - } - - if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { + if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } + if err := e.WriteSnapshot(); err != nil { t.Fatalf("failed to snapshot: %s", err.Error()) } @@ -596,15 +576,10 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { } defer e.Close() - for _, p := range []models.Point{p1} { - if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil { - t.Fatalf("create series index error: %v", err) - } - } - - if err := e.WritePoints([]models.Point{p1}); err != nil { + if err := e.writePoints(p1); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } + if err := e.WriteSnapshot(); err != nil { t.Fatalf("failed to snapshot: %s", err.Error()) } @@ -1050,13 +1025,10 @@ func (e *Engine) WritePointsString(ptstr ...string) error { // writePoints adds the series for the provided points to the index, and writes // the point data to the engine. func (e *Engine) writePoints(points ...models.Point) error { - for _, point := range points { - // Write into the index. - iter := point.FieldIterator() - iter.Next() - if err := e.Engine.CreateSeriesIfNotExists(point.Key(), point.Name(), point.Tags(), iter.Type()); err != nil { - return err - } + // Write into the index. + collection := tsdb.NewSeriesCollection(points) + if err := e.CreateSeriesListIfNotExists(collection); err != nil { + return err } // Write the points into the cache/wal. return e.WritePoints(points)