Merge pull request #1332 from zhulongcheng/rm-create-series
Remove Index.CreateSeriesIfNotExistspull/10616/head
commit
7ccb201b80
|
@ -120,12 +120,6 @@ func (i *Index) MustOpen() {
|
|||
}
|
||||
}
|
||||
|
||||
func (idx *Index) AddSeries(name string, tags map[string]string, typ models.FieldType) error {
|
||||
t := models.NewTags(tags)
|
||||
key := fmt.Sprintf("%s,%s", name, t.HashKey())
|
||||
return idx.CreateSeriesIfNotExists([]byte(key), []byte(name), t, typ)
|
||||
}
|
||||
|
||||
// Reopen closes and re-opens the underlying index, without removing any data.
|
||||
func (i *Index) Reopen() error {
|
||||
if err := i.Index.Close(); err != nil {
|
||||
|
|
|
@ -666,51 +666,6 @@ func (i *Index) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
|
||||
// TODO(edd): This should go.
|
||||
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error {
|
||||
collection := &tsdb.SeriesCollection{
|
||||
Keys: [][]byte{key},
|
||||
Names: [][]byte{name},
|
||||
Tags: []models.Tags{tags},
|
||||
Types: []models.FieldType{typ},
|
||||
}
|
||||
err := i.sfile.CreateSeriesListIfNotExists(collection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ids, err := i.partition(key).createSeriesListIfNotExists(collection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(ids) == 0 || ids[0].IsZero() {
|
||||
return nil // No new series, nothing further to update.
|
||||
}
|
||||
|
||||
// If there are cached sets for any of the tag pairs, they will need to be
|
||||
// updated with the series id.
|
||||
i.tagValueCache.RLock()
|
||||
if i.tagValueCache.measurementContainsSets(name) {
|
||||
for _, pair := range tags {
|
||||
// TODO(edd): It's not clear to me yet whether it will be better to take a lock
|
||||
// on every series id set, or whether to gather them all up under the cache rlock
|
||||
// and then take the cache lock and update them all at once (without invoking a lock
|
||||
// on each series id set).
|
||||
//
|
||||
// Taking the cache lock will block all queries, but is one lock. Taking each series set
|
||||
// lock might be many lock/unlocks but will only block a query that needs that particular set.
|
||||
//
|
||||
// Need to think on it, but I think taking a lock on each series id set is the way to go.
|
||||
//
|
||||
// Note this will only add `id` to the set if it exists.
|
||||
i.tagValueCache.addToSet(name, pair.Key, pair.Value, ids[0]) // Takes a lock on the series id set
|
||||
}
|
||||
}
|
||||
i.tagValueCache.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// InitializeSeries is a no-op. This only applies to the in-memory index.
|
||||
func (i *Index) InitializeSeries(*tsdb.SeriesCollection) error {
|
||||
return nil
|
||||
|
|
|
@ -1107,10 +1107,6 @@ func (e *Engine) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection)
|
|||
return e.index.CreateSeriesListIfNotExists(collection)
|
||||
}
|
||||
|
||||
func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error {
|
||||
return e.index.CreateSeriesIfNotExists(key, name, tags, typ)
|
||||
}
|
||||
|
||||
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
|
||||
func (e *Engine) WriteSnapshot() error {
|
||||
// Lock and grab the cache snapshot along with all the closed WAL
|
||||
|
|
|
@ -177,15 +177,10 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
|
|||
}
|
||||
defer e.Close()
|
||||
|
||||
for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
|
||||
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil {
|
||||
t.Fatalf("create series index error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
@ -281,15 +276,10 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
|
|||
}
|
||||
defer e.Close()
|
||||
|
||||
for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
|
||||
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil {
|
||||
t.Fatalf("create series index error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
@ -401,15 +391,10 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
|
|||
}
|
||||
defer e.Close()
|
||||
|
||||
for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
|
||||
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil {
|
||||
t.Fatalf("create series index error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
@ -483,15 +468,10 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
|
|||
}
|
||||
defer e.Close()
|
||||
|
||||
for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
|
||||
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil {
|
||||
t.Fatalf("create series index error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
@ -596,15 +576,10 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
|
|||
}
|
||||
defer e.Close()
|
||||
|
||||
for _, p := range []models.Point{p1} {
|
||||
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags(), models.Float); err != nil {
|
||||
t.Fatalf("create series index error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p1}); err != nil {
|
||||
if err := e.writePoints(p1); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
@ -1050,13 +1025,10 @@ func (e *Engine) WritePointsString(ptstr ...string) error {
|
|||
// writePoints adds the series for the provided points to the index, and writes
|
||||
// the point data to the engine.
|
||||
func (e *Engine) writePoints(points ...models.Point) error {
|
||||
for _, point := range points {
|
||||
// Write into the index.
|
||||
iter := point.FieldIterator()
|
||||
iter.Next()
|
||||
if err := e.Engine.CreateSeriesIfNotExists(point.Key(), point.Name(), point.Tags(), iter.Type()); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write into the index.
|
||||
collection := tsdb.NewSeriesCollection(points)
|
||||
if err := e.CreateSeriesListIfNotExists(collection); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write the points into the cache/wal.
|
||||
return e.WritePoints(points)
|
||||
|
|
Loading…
Reference in New Issue