From 3318c94a2f47783a6c77a1f66035bcbd90d5679b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 8 Dec 2017 11:38:53 +0000 Subject: [PATCH] =?UTF-8?q?Clean=20up=20=F0=9F=9B=81:?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/backup_restore_test.go | 2 - tsdb/engine/tsm1/engine.go | 34 ------------- tsdb/engine/tsm1/engine_test.go | 69 +++++++++---------------- tsdb/index.go | 6 --- tsdb/index/tsi1/file_set_test.go | 87 ++++++++++++++++++-------------- tsdb/index/tsi1/index.go | 18 ------- tsdb/index/tsi1/partition.go | 12 ----- 7 files changed, 75 insertions(+), 153 deletions(-) diff --git a/tests/backup_restore_test.go b/tests/backup_restore_test.go index f04c09b152..7d7962f38c 100644 --- a/tests/backup_restore_test.go +++ b/tests/backup_restore_test.go @@ -55,7 +55,6 @@ func TestServer_BackupAndRestore(t *testing.T) { if res != expected { t.Fatalf("query results wrong:\n\texp: %s\n\tgot: %s", expected, res) } - return // TEMP // now backup cmd := backup.NewCommand() @@ -68,7 +67,6 @@ func TestServer_BackupAndRestore(t *testing.T) { t.Fatalf("error backing up: %s, hostAddress: %s", err.Error(), hostAddress) } }() - return // TEMP if _, err := os.Stat(config.Meta.Dir); err == nil || !os.IsNotExist(err) { t.Fatalf("meta dir should be deleted") diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 77bb5dcb80..2618ce50c8 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -351,12 +351,6 @@ func (e *Engine) MeasurementExists(name []byte) (bool, error) { return e.index.MeasurementExists(name) } -/* -func (e *Engine) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { - return e.index.MeasurementNamesByExpr(expr) -} -*/ - func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { return e.index.MeasurementNamesByRegex(re) } @@ -379,21 +373,6 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[ return e.index.MeasurementTagKeysByExpr(name, expr) } -// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. -// -// MeasurementTagKeyValuesByExpr relies on the provided tag keys being sorted. -// The caller can indicate the tag keys have been sorted by setting the -// keysSorted argument appropriately. Tag values are returned in a slice that -// is indexible according to the sorted order of the tag keys, e.g., the values -// for the earliest tag k will be available in index 0 of the returned values -// slice. -// -/* -func (e *Engine) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - return e.index.MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted) -} -*/ - func (e *Engine) TagKeyCardinality(name, key []byte) int { return e.index.TagKeyCardinality(name, key) } @@ -1231,19 +1210,6 @@ func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error { return e.index.ForEachMeasurementName(fn) } -/* -func (e *Engine) MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) { - return e.index.MeasurementSeriesKeysByExprIterator(name, expr) -} -*/ - -/* -// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. -func (e *Engine) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - return e.index.MeasurementSeriesKeysByExpr(name, expr) -} -*/ - func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error { return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice) } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 868739b365..278b3207fa 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -118,10 +118,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Ensure that deletes only sent to the WAL will clear out the data from the cache on restart func TestEngine_DeleteWALLoadMetadata(t *testing.T) { - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() if err := e.WritePointsString( @@ -263,13 +260,9 @@ func TestEngine_Backup(t *testing.T) { func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() - // e.CreateMeasurement("cpu") e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) @@ -319,10 +312,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -374,10 +364,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -430,10 +417,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -486,10 +470,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { func TestEngine_CreateIterator_Aux(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -545,10 +526,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) { func TestEngine_CreateIterator_Condition(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -755,10 +733,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) { func TestEngine_CreateCursor_Ascending(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -807,10 +782,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) { func TestEngine_CreateCursor_Descending(t *testing.T) { t.Parallel() - sfile := MustOpenSeriesFile() - defer MustCloseSeriesFile(sfile) - - e := MustOpenDefaultEngine(sfile) + e := MustOpenDefaultEngine() defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false) @@ -1140,8 +1112,8 @@ func NewEngine(index string, sfile *tsdb.SeriesFile) *Engine { // MustOpenDefaultEngine returns a new, open instance of Engine using the default // index. Useful when the index is not directly under test. -func MustOpenDefaultEngine(sfile *tsdb.SeriesFile) *Engine { - e := NewEngine(tsdb.DefaultIndex, sfile) +func MustOpenDefaultEngine() *Engine { + e := NewEngine(tsdb.DefaultIndex, MustOpenSeriesFile()) if err := e.Open(); err != nil { panic(err) } @@ -1162,6 +1134,11 @@ func (e *Engine) Close() error { if e.index != nil { e.index.Close() } + + if e.sfile != nil { + MustCloseSeriesFile(e.sfile) + } + defer os.RemoveAll(e.root) return e.Engine.Close() } @@ -1264,20 +1241,24 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) { } // NewSeriesFile returns a new instance of SeriesFile with a temporary file path. -func NewSeriesFile() *tsdb.SeriesFile { +func NewSeriesFile() (*tsdb.SeriesFile, error) { file, err := ioutil.TempFile("", "tsm1-series-file-") if err != nil { - panic(err) + return nil, err } file.Close() - return tsdb.NewSeriesFile(file.Name()) + return tsdb.NewSeriesFile(file.Name()), nil } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. func MustOpenSeriesFile() *tsdb.SeriesFile { - f := NewSeriesFile() - if err := f.Open(); err != nil { + f, err := NewSeriesFile() + if err != nil { + panic(err) + } + + if err = f.Open(); err != nil { panic(err) } return f diff --git a/tsdb/index.go b/tsdb/index.go index 8f2181bbd4..a806af6e8c 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -40,11 +40,8 @@ type Index interface { HasTagKey(name, key []byte) (bool, error) HasTagValue(name, key, value []byte) (bool, error) - // TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) - // MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) - // ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error TagKeyCardinality(name, key []byte) int // InfluxQL system iterators @@ -54,9 +51,6 @@ type Index interface { MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) - // MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (SeriesIDIterator, error) - // MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error) - // SeriesIDIterator(opt query.IteratorOptions) (SeriesIDIterator, error) // Sets a shared fieldset from the engine. FieldSet() *MeasurementFieldSet diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index 12e49ef891..31a43bb2b5 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -54,46 +54,59 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { t.Fatalf("expected eof, got: %d", elem.SeriesID) } }) - /* - // Add more series. - if err := idx.CreateSeriesSliceIfNotExists([]Series{ - {Name: []byte("disk")}, - {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, - {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, - }); err != nil { + + // Add more series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("disk")}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify additional series. + idx.Run(t, func(t *testing.T) { + fs := idx.PartitionAt(0).RetainFileSet() + defer fs.Release() + + itr := fs.SeriesFile().SeriesIDIterator() + if itr == nil { + t.Fatal("expected iterator") + } + + allexpected := []struct { + name string + tagset string + }{ + {`cpu`, `[{region east}]`}, + {`cpu`, `[{region west}]`}, + {`mem`, `[{region east}]`}, + {`disk`, `[]`}, + {`cpu`, `[{region north}]`}, + } + + for _, expected := range allexpected { + e, err := itr.Next() + if err != nil { + t.Fatal(err) + } + + if name, tags := fs.SeriesFile().Series(e.SeriesID); string(name) != expected.name || tags.String() != expected.tagset { + t.Fatalf("unexpected series: %s/%s", name, tags.String()) + } + } + + // Check for end of iterator... + e, err := itr.Next() + if err != nil { t.Fatal(err) } - // Verify additional series. - idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() - defer fs.Release() - - itr := fs.SeriesFile().SeriesIDIterator() - if itr == nil { - t.Fatal("expected iterator") - } - - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `disk` || tags.String() != `[]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if e := itr.Next(); e.SeriesID != 0 { - name, tags := fs.SeriesFile().Series(e.SeriesID) - t.Fatalf("expected eof, got: %s/%s", name, tags.String()) - } - })*/ + if e.SeriesID != 0 { + name, tags := fs.SeriesFile().Series(e.SeriesID) + t.Fatalf("got: %s/%s, but expected EOF", name, tags.String()) + } + }) } // Ensure fileset can return an iterator over all series for one measurement. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 5bb482f0be..e30d1bed58 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -411,15 +411,6 @@ func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, return tsdb.MergeSeriesIDIterators(itrs...), nil } -/* -// MeasurementNamesByExpr returns measurement names for the provided expression. -func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { - return i.fetchByteValues(func(idx int) ([][]byte, error) { - return i.partitions[idx].MeasurementNamesByExpr(expr) - }) -} -*/ - // MeasurementNamesByRegex returns measurement names for the provided regex. func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { return i.fetchByteValues(func(idx int) ([][]byte, error) { @@ -427,15 +418,6 @@ func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { }) } -/* -// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. -func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - return i.fetchByteValues(func(idx int) ([][]byte, error) { - return i.partitions[idx].MeasurementSeriesKeysByExpr(name, expr) - }) -} -*/ - // DropMeasurement deletes a measurement from the index. It returns the first // error encountered, if any. func (i *Index) DropMeasurement(name []byte) error { diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index d7a3209dc6..f9f58e95ee 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -408,18 +408,6 @@ func (i *Partition) MeasurementExists(name []byte) (bool, error) { return m != nil && !m.Deleted(), nil } -/* -func (i *Partition) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { - fs := i.RetainFileSet() - defer fs.Release() - - names, err := fs.MeasurementNamesByExpr(expr) - - // Clone byte slices since they will be used after the fileset is released. - return bytesutil.CloneSlice(names), err -} -*/ - func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { fs := i.RetainFileSet() defer fs.Release()