diff --git a/tests/server_test.go b/tests/server_test.go index fab68ef09a..8c94a2737d 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -40,14 +40,18 @@ func TestMain(m *testing.M) { benchServer = OpenDefaultServer(c) // Run test suite. - fmt.Printf("============= Running all tests for %q index =============\n", indexType) + if testing.Verbose() { + fmt.Printf("============= Running all tests for %q index =============\n", indexType) + } if thisr := m.Run(); r == 0 { r = thisr // We'll always remember the first time r is non-zero } // Cleanup benchServer.Close() - fmt.Println() + if testing.Verbose() { + fmt.Println() + } } os.Exit(r) } @@ -8413,7 +8417,7 @@ func TestServer_Query_LargeTimestamp(t *testing.T) { // Open a new server with the same configuration file. // This is to ensure the meta data was marshaled correctly. s2 := OpenServer((s.(*LocalServer)).Config) - defer s2.Close() + defer s2.(*LocalServer).Server.Close() for _, query := range test.queries { t.Run(query.name, func(t *testing.T) { diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 33f62ae482..824a146ee9 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -601,240 +601,278 @@ func TestShard_Close_RemoveIndex(t *testing.T) { // Ensure a shard can create iterators for its underlying data. func TestShard_CreateIterator_Ascending(t *testing.T) { - sh := NewShard() + var sh *Shard + var itr query.Iterator - // Calling CreateIterator when the engine is not open will return - // ErrEngineClosed. - _, got := sh.CreateIterator("cpu", query.IteratorOptions{}) - if exp := tsdb.ErrEngineClosed; got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } + test := func(index string) { + sh = NewShard(index) - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() + // Calling CreateIterator when the engine is not open will return + // ErrEngineClosed. + _, got := sh.CreateIterator("cpu", query.IteratorOptions{}) + if exp := tsdb.ErrEngineClosed; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } - sh.MustWritePointsString(` + if err := sh.Open(); err != nil { + t.Fatal(err) + } + + sh.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - // Create iterator. - itr, err := sh.CreateIterator("cpu", query.IteratorOptions{ - Expr: influxql.MustParseExpr(`value`), - Aux: []influxql.VarRef{{Val: "val2"}}, - Dimensions: []string{"host"}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - }) - if err != nil { - t.Fatal(err) - } - fitr := itr.(query.FloatIterator) - defer itr.Close() + // Create iterator. + var err error + itr, err = sh.CreateIterator("cpu", query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Aux: []influxql.VarRef{{Val: "val2"}}, + Dimensions: []string{"host"}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + }) + if err != nil { + t.Fatal(err) + } + fitr := itr.(query.FloatIterator) - // Read values from iterator. - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{ - Name: "cpu", - Tags: query.NewTags(map[string]string{"host": "serverA"}), - Time: time.Unix(0, 0).UnixNano(), - Value: 100, - Aux: []interface{}{(*float64)(nil)}, - }) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + // Read values from iterator. + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(0): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{ + Name: "cpu", + Tags: query.NewTags(map[string]string{"host": "serverA"}), + Time: time.Unix(0, 0).UnixNano(), + Value: 100, + Aux: []interface{}{(*float64)(nil)}, + }) { + t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + } + + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(1): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{ + Name: "cpu", + Tags: query.NewTags(map[string]string{"host": "serverA"}), + Time: time.Unix(10, 0).UnixNano(), + Value: 50, + Aux: []interface{}{float64(5)}, + }) { + t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) + } + + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(2): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{ + Name: "cpu", + Tags: query.NewTags(map[string]string{"host": "serverB"}), + Time: time.Unix(0, 0).UnixNano(), + Value: 25, + Aux: []interface{}{(*float64)(nil)}, + }) { + t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + } } - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{ - Name: "cpu", - Tags: query.NewTags(map[string]string{"host": "serverA"}), - Time: time.Unix(10, 0).UnixNano(), - Value: 50, - Aux: []interface{}{float64(5)}, - }) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(2): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{ - Name: "cpu", - Tags: query.NewTags(map[string]string{"host": "serverB"}), - Time: time.Unix(0, 0).UnixNano(), - Value: 25, - Aux: []interface{}{(*float64)(nil)}, - }) { - t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + sh.Close() + itr.Close() } } // Ensure a shard can create iterators for its underlying data. func TestShard_CreateIterator_Descending(t *testing.T) { - sh := NewShard() + var ( + sh *Shard + itr query.Iterator + ) - // Calling CreateIterator when the engine is not open will return - // ErrEngineClosed. - _, got := sh.CreateIterator("cpu", query.IteratorOptions{}) - if exp := tsdb.ErrEngineClosed; got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } + test := func(index string) { + sh = NewShard(index) - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() + // Calling CreateIterator when the engine is not open will return + // ErrEngineClosed. + _, got := sh.CreateIterator("cpu", query.IteratorOptions{}) + if exp := tsdb.ErrEngineClosed; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } - sh.MustWritePointsString(` + if err := sh.Open(); err != nil { + t.Fatal(err) + } + + sh.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - // Create iterator. - itr, err := sh.CreateIterator("cpu", query.IteratorOptions{ - Expr: influxql.MustParseExpr(`value`), - Aux: []influxql.VarRef{{Val: "val2"}}, - Dimensions: []string{"host"}, - Ascending: false, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - }) - if err != nil { - t.Fatal(err) - } - defer itr.Close() - fitr := itr.(query.FloatIterator) + // Create iterator. + var err error + itr, err = sh.CreateIterator("cpu", query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Aux: []influxql.VarRef{{Val: "val2"}}, + Dimensions: []string{"host"}, + Ascending: false, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + }) + if err != nil { + t.Fatal(err) + } + fitr := itr.(query.FloatIterator) - // Read values from iterator. - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{ - Name: "cpu", - Tags: query.NewTags(map[string]string{"host": "serverB"}), - Time: time.Unix(0, 0).UnixNano(), - Value: 25, - Aux: []interface{}{(*float64)(nil)}, - }) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + // Read values from iterator. + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(0): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{ + Name: "cpu", + Tags: query.NewTags(map[string]string{"host": "serverB"}), + Time: time.Unix(0, 0).UnixNano(), + Value: 25, + Aux: []interface{}{(*float64)(nil)}, + }) { + t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + } + + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(1): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{ + Name: "cpu", + Tags: query.NewTags(map[string]string{"host": "serverA"}), + Time: time.Unix(10, 0).UnixNano(), + Value: 50, + Aux: []interface{}{float64(5)}, + }) { + t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) + } + + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(2): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{ + Name: "cpu", + Tags: query.NewTags(map[string]string{"host": "serverA"}), + Time: time.Unix(0, 0).UnixNano(), + Value: 100, + Aux: []interface{}{(*float64)(nil)}, + }) { + t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + } } - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{ - Name: "cpu", - Tags: query.NewTags(map[string]string{"host": "serverA"}), - Time: time.Unix(10, 0).UnixNano(), - Value: 50, - Aux: []interface{}{float64(5)}, - }) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(2): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{ - Name: "cpu", - Tags: query.NewTags(map[string]string{"host": "serverA"}), - Time: time.Unix(0, 0).UnixNano(), - Value: 100, - Aux: []interface{}{(*float64)(nil)}, - }) { - t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + sh.Close() + itr.Close() } } func TestShard_Disabled_WriteQuery(t *testing.T) { - sh := NewShard() - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() + var sh *Shard - sh.SetEnabled(false) + test := func(index string) { + sh = NewShard(index) + if err := sh.Open(); err != nil { + t.Fatal(err) + } - pt := models.MustNewPoint( - "cpu", - models.NewTags(map[string]string{"host": "server"}), - map[string]interface{}{"value": 1.0}, - time.Unix(1, 2), - ) + sh.SetEnabled(false) - err := sh.WritePoints([]models.Point{pt}) - if err == nil { - t.Fatalf("expected shard disabled error") - } - if err != tsdb.ErrShardDisabled { - t.Fatalf(err.Error()) + pt := models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err := sh.WritePoints([]models.Point{pt}) + if err == nil { + t.Fatalf("expected shard disabled error") + } + if err != tsdb.ErrShardDisabled { + t.Fatalf(err.Error()) + } + + _, got := sh.CreateIterator("cpu", query.IteratorOptions{}) + if err == nil { + t.Fatalf("expected shard disabled error") + } + if exp := tsdb.ErrShardDisabled; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + sh.SetEnabled(true) + + err = sh.WritePoints([]models.Point{pt}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if _, err = sh.CreateIterator("cpu", query.IteratorOptions{}); err != nil { + t.Fatalf("unexpected error: %v", got) + } } - _, got := sh.CreateIterator("cpu", query.IteratorOptions{}) - if err == nil { - t.Fatalf("expected shard disabled error") - } - if exp := tsdb.ErrShardDisabled; got != exp { - t.Fatalf("got %v, expected %v", got, exp) - } - - sh.SetEnabled(true) - - err = sh.WritePoints([]models.Point{pt}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if _, err = sh.CreateIterator("cpu", query.IteratorOptions{}); err != nil { - t.Fatalf("unexpected error: %v", got) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + sh.Close() } } func TestShard_Closed_Functions(t *testing.T) { - sh := NewShard() - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() + var sh *Shard + test := func(index string) { + sh = NewShard(index) + if err := sh.Open(); err != nil { + t.Fatal(err) + } - pt := models.MustNewPoint( - "cpu", - models.NewTags(map[string]string{"host": "server"}), - map[string]interface{}{"value": 1.0}, - time.Unix(1, 2), - ) + pt := models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) - if err := sh.WritePoints([]models.Point{pt}); err != nil { - t.Fatalf("unexpected error: %v", err) + if err := sh.WritePoints([]models.Point{pt}); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + sh.Close() + + // Should not panic, just a no-op when shard is closed + if err := sh.ForEachMeasurementTagKey([]byte("cpu"), func(k []byte) error { + return nil + }); err != nil { + t.Fatalf("expected nil: got %v", err) + } + + // Should not panic, just a no-op when shard is closed + if exp, got := 0, sh.TagKeyCardinality([]byte("cpu"), []byte("host")); exp != got { + t.Fatalf("expected nil: exp %v, got %v", exp, got) + } } - sh.Close() - - // Should not panic, just a no-op when shard is closed - if err := sh.ForEachMeasurementTagKey([]byte("cpu"), func(k []byte) error { - return nil - }); err != nil { - t.Fatalf("expected nil: got %v", err) - } - - // Should not panic, just a no-op when shard is closed - if exp, got := 0, sh.TagKeyCardinality([]byte("cpu"), []byte("host")); exp != got { - t.Fatalf("expected nil: exp %v, got %v", exp, got) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } func TestShard_FieldDimensions(t *testing.T) { - sh := NewShard() + var sh *Shard - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() + setup := func(index string) { + sh = NewShard(index) - sh.MustWritePointsString(` + if err := sh.Open(); err != nil { + t.Fatal(err) + } + + sh.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 @@ -842,108 +880,115 @@ mem,host=serverA value=25i 0 mem,host=serverB value=50i,val3=t 10 _reserved,region=uswest value="foo" 0 `) + } - for _, tt := range []struct { - sources []string - f map[string]influxql.DataType - d map[string]struct{} - }{ - { - sources: []string{"cpu"}, - f: map[string]influxql.DataType{ - "value": influxql.Float, - "val2": influxql.Float, + for _, index := range tsdb.RegisteredIndexes() { + setup(index) + for _, tt := range []struct { + sources []string + f map[string]influxql.DataType + d map[string]struct{} + }{ + { + sources: []string{"cpu"}, + f: map[string]influxql.DataType{ + "value": influxql.Float, + "val2": influxql.Float, + }, + d: map[string]struct{}{ + "host": {}, + "region": {}, + }, }, - d: map[string]struct{}{ - "host": {}, - "region": {}, + { + sources: []string{"mem"}, + f: map[string]influxql.DataType{ + "value": influxql.Integer, + "val3": influxql.Boolean, + }, + d: map[string]struct{}{ + "host": {}, + }, }, - }, - { - sources: []string{"mem"}, - f: map[string]influxql.DataType{ - "value": influxql.Integer, - "val3": influxql.Boolean, + { + sources: []string{"cpu", "mem"}, + f: map[string]influxql.DataType{ + "value": influxql.Float, + "val2": influxql.Float, + "val3": influxql.Boolean, + }, + d: map[string]struct{}{ + "host": {}, + "region": {}, + }, }, - d: map[string]struct{}{ - "host": {}, + { + sources: []string{"_fieldKeys"}, + f: map[string]influxql.DataType{ + "fieldKey": influxql.String, + "fieldType": influxql.String, + }, + d: map[string]struct{}{}, }, - }, - { - sources: []string{"cpu", "mem"}, - f: map[string]influxql.DataType{ - "value": influxql.Float, - "val2": influxql.Float, - "val3": influxql.Boolean, + { + sources: []string{"_series"}, + f: map[string]influxql.DataType{ + "key": influxql.String, + }, + d: map[string]struct{}{}, }, - d: map[string]struct{}{ - "host": {}, - "region": {}, + { + sources: []string{"_tagKeys"}, + f: map[string]influxql.DataType{ + "tagKey": influxql.String, + }, + d: map[string]struct{}{}, }, - }, - { - sources: []string{"_fieldKeys"}, - f: map[string]influxql.DataType{ - "fieldKey": influxql.String, - "fieldType": influxql.String, + { + sources: []string{"_reserved"}, + f: map[string]influxql.DataType{ + "value": influxql.String, + }, + d: map[string]struct{}{ + "region": {}, + }, }, - d: map[string]struct{}{}, - }, - { - sources: []string{"_series"}, - f: map[string]influxql.DataType{ - "key": influxql.String, + { + sources: []string{"unknown"}, + f: map[string]influxql.DataType{}, + d: map[string]struct{}{}, }, - d: map[string]struct{}{}, - }, - { - sources: []string{"_tagKeys"}, - f: map[string]influxql.DataType{ - "tagKey": influxql.String, - }, - d: map[string]struct{}{}, - }, - { - sources: []string{"_reserved"}, - f: map[string]influxql.DataType{ - "value": influxql.String, - }, - d: map[string]struct{}{ - "region": {}, - }, - }, - { - sources: []string{"unknown"}, - f: map[string]influxql.DataType{}, - d: map[string]struct{}{}, - }, - } { - name := strings.Join(tt.sources, ",") - t.Run(name, func(t *testing.T) { - f, d, err := sh.FieldDimensions(tt.sources) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + } { + name := fmt.Sprintf("%s_%s", strings.Join(tt.sources, ","), index) + t.Run(name, func(t *testing.T) { + f, d, err := sh.FieldDimensions(tt.sources) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } - if diff := cmp.Diff(tt.f, f, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("unexpected fields:\n%s", diff) - } - if diff := cmp.Diff(tt.d, d, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("unexpected dimensions:\n%s", diff) - } - }) + if diff := cmp.Diff(tt.f, f, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("unexpected fields:\n%s", diff) + } + if diff := cmp.Diff(tt.d, d, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("unexpected dimensions:\n%s", diff) + } + }) + } + sh.Close() } } func TestShard_MapType(t *testing.T) { - sh := NewShard() + var sh *Shard - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() + setup := func(index string) { + sh = NewShard(index) - sh.MustWritePointsString(` + if err := sh.Open(); err != nil { + t.Fatal(err) + } + + sh.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 @@ -951,449 +996,477 @@ mem,host=serverA value=25i 0 mem,host=serverB value=50i,val3=t 10 _reserved,region=uswest value="foo" 0 `) + } - for _, tt := range []struct { - measurement string - field string - typ influxql.DataType - }{ - { - measurement: "cpu", - field: "value", - typ: influxql.Float, - }, - { - measurement: "cpu", - field: "host", - typ: influxql.Tag, - }, - { - measurement: "cpu", - field: "region", - typ: influxql.Tag, - }, - { - measurement: "cpu", - field: "val2", - typ: influxql.Float, - }, - { - measurement: "cpu", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "mem", - field: "value", - typ: influxql.Integer, - }, - { - measurement: "mem", - field: "val3", - typ: influxql.Boolean, - }, - { - measurement: "mem", - field: "host", - typ: influxql.Tag, - }, - { - measurement: "unknown", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_fieldKeys", - field: "fieldKey", - typ: influxql.String, - }, - { - measurement: "_fieldKeys", - field: "fieldType", - typ: influxql.String, - }, - { - measurement: "_fieldKeys", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_series", - field: "key", - typ: influxql.String, - }, - { - measurement: "_series", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_tagKeys", - field: "tagKey", - typ: influxql.String, - }, - { - measurement: "_tagKeys", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_reserved", - field: "value", - typ: influxql.String, - }, - { - measurement: "_reserved", - field: "region", - typ: influxql.Tag, - }, - } { - name := fmt.Sprintf("%s - %s", tt.measurement, tt.field) - t.Run(name, func(t *testing.T) { - typ := sh.MapType(tt.measurement, tt.field) - if have, want := typ, tt.typ; have != want { - t.Errorf("unexpected data type: have=%#v want=%#v", have, want) - } - }) + for _, index := range tsdb.RegisteredIndexes() { + setup(index) + for _, tt := range []struct { + measurement string + field string + typ influxql.DataType + }{ + { + measurement: "cpu", + field: "value", + typ: influxql.Float, + }, + { + measurement: "cpu", + field: "host", + typ: influxql.Tag, + }, + { + measurement: "cpu", + field: "region", + typ: influxql.Tag, + }, + { + measurement: "cpu", + field: "val2", + typ: influxql.Float, + }, + { + measurement: "cpu", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "mem", + field: "value", + typ: influxql.Integer, + }, + { + measurement: "mem", + field: "val3", + typ: influxql.Boolean, + }, + { + measurement: "mem", + field: "host", + typ: influxql.Tag, + }, + { + measurement: "unknown", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_fieldKeys", + field: "fieldKey", + typ: influxql.String, + }, + { + measurement: "_fieldKeys", + field: "fieldType", + typ: influxql.String, + }, + { + measurement: "_fieldKeys", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_series", + field: "key", + typ: influxql.String, + }, + { + measurement: "_series", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_tagKeys", + field: "tagKey", + typ: influxql.String, + }, + { + measurement: "_tagKeys", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_reserved", + field: "value", + typ: influxql.String, + }, + { + measurement: "_reserved", + field: "region", + typ: influxql.Tag, + }, + } { + name := fmt.Sprintf("%s_%s_%s", index, tt.measurement, tt.field) + t.Run(name, func(t *testing.T) { + typ := sh.MapType(tt.measurement, tt.field) + if have, want := typ, tt.typ; have != want { + t.Errorf("unexpected data type: have=%#v want=%#v", have, want) + } + }) + } + sh.Close() } } func TestShard_MeasurementsByRegex(t *testing.T) { - sh := NewShard() + var sh *Shard + setup := func(index string) { + sh = NewShard(index) + if err := sh.Open(); err != nil { + t.Fatal(err) + } - if err := sh.Open(); err != nil { - t.Fatal(err) - } - defer sh.Close() - - sh.MustWritePointsString(` + sh.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 mem,host=serverA value=25i 0 mem,host=serverB value=50i,val3=t 10 `) + } - for _, tt := range []struct { - regex string - measurements []string - }{ - {regex: `cpu`, measurements: []string{"cpu"}}, - {regex: `mem`, measurements: []string{"mem"}}, - {regex: `cpu|mem`, measurements: []string{"cpu", "mem"}}, - {regex: `gpu`, measurements: []string{}}, - {regex: `pu`, measurements: []string{"cpu"}}, - {regex: `p|m`, measurements: []string{"cpu", "mem"}}, - } { - t.Run(tt.regex, func(t *testing.T) { - re := regexp.MustCompile(tt.regex) - measurements := sh.MeasurementsByRegex(re) - sort.Strings(measurements) - if diff := cmp.Diff(tt.measurements, measurements, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("unexpected measurements:\n%s", diff) - } - }) + for _, index := range tsdb.RegisteredIndexes() { + setup(index) + for _, tt := range []struct { + regex string + measurements []string + }{ + {regex: `cpu`, measurements: []string{"cpu"}}, + {regex: `mem`, measurements: []string{"mem"}}, + {regex: `cpu|mem`, measurements: []string{"cpu", "mem"}}, + {regex: `gpu`, measurements: []string{}}, + {regex: `pu`, measurements: []string{"cpu"}}, + {regex: `p|m`, measurements: []string{"cpu", "mem"}}, + } { + t.Run(index+"_"+tt.regex, func(t *testing.T) { + re := regexp.MustCompile(tt.regex) + measurements := sh.MeasurementsByRegex(re) + sort.Strings(measurements) + if diff := cmp.Diff(tt.measurements, measurements, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("unexpected measurements:\n%s", diff) + } + }) + } + sh.Close() } } func TestShards_FieldDimensions(t *testing.T) { - shard1 := NewShard() + var shard1, shard2 *Shard - if err := shard1.Open(); err != nil { - t.Fatal(err) - } - defer shard1.Close() + setup := func(index string) { + shard1 = NewShard(index) + if err := shard1.Open(); err != nil { + t.Fatal(err) + } - shard1.MustWritePointsString(` + shard1.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - shard2 := NewShard() + shard2 = NewShard(index) + if err := shard2.Open(); err != nil { + t.Fatal(err) + } - if err := shard2.Open(); err != nil { - t.Fatal(err) - } - - shard2.MustWritePointsString(` + shard2.MustWritePointsString(` mem,host=serverA value=25i 0 mem,host=serverB value=50i,val3=t 10 _reserved,region=uswest value="foo" 0 `) + } - sh := tsdb.Shards([]*tsdb.Shard{shard1.Shard, shard2.Shard}) - for _, tt := range []struct { - sources []string - f map[string]influxql.DataType - d map[string]struct{} - }{ - { - sources: []string{"cpu"}, - f: map[string]influxql.DataType{ - "value": influxql.Float, - "val2": influxql.Float, + for _, index := range tsdb.RegisteredIndexes() { + setup(index) + sh := tsdb.Shards([]*tsdb.Shard{shard1.Shard, shard2.Shard}) + for _, tt := range []struct { + sources []string + f map[string]influxql.DataType + d map[string]struct{} + }{ + { + sources: []string{"cpu"}, + f: map[string]influxql.DataType{ + "value": influxql.Float, + "val2": influxql.Float, + }, + d: map[string]struct{}{ + "host": {}, + "region": {}, + }, }, - d: map[string]struct{}{ - "host": {}, - "region": {}, + { + sources: []string{"mem"}, + f: map[string]influxql.DataType{ + "value": influxql.Integer, + "val3": influxql.Boolean, + }, + d: map[string]struct{}{ + "host": {}, + }, }, - }, - { - sources: []string{"mem"}, - f: map[string]influxql.DataType{ - "value": influxql.Integer, - "val3": influxql.Boolean, + { + sources: []string{"cpu", "mem"}, + f: map[string]influxql.DataType{ + "value": influxql.Float, + "val2": influxql.Float, + "val3": influxql.Boolean, + }, + d: map[string]struct{}{ + "host": {}, + "region": {}, + }, }, - d: map[string]struct{}{ - "host": {}, + { + sources: []string{"_fieldKeys"}, + f: map[string]influxql.DataType{ + "fieldKey": influxql.String, + "fieldType": influxql.String, + }, + d: map[string]struct{}{}, }, - }, - { - sources: []string{"cpu", "mem"}, - f: map[string]influxql.DataType{ - "value": influxql.Float, - "val2": influxql.Float, - "val3": influxql.Boolean, + { + sources: []string{"_series"}, + f: map[string]influxql.DataType{ + "key": influxql.String, + }, + d: map[string]struct{}{}, }, - d: map[string]struct{}{ - "host": {}, - "region": {}, + { + sources: []string{"_tagKeys"}, + f: map[string]influxql.DataType{ + "tagKey": influxql.String, + }, + d: map[string]struct{}{}, }, - }, - { - sources: []string{"_fieldKeys"}, - f: map[string]influxql.DataType{ - "fieldKey": influxql.String, - "fieldType": influxql.String, + { + sources: []string{"_reserved"}, + f: map[string]influxql.DataType{ + "value": influxql.String, + }, + d: map[string]struct{}{ + "region": {}, + }, }, - d: map[string]struct{}{}, - }, - { - sources: []string{"_series"}, - f: map[string]influxql.DataType{ - "key": influxql.String, + { + sources: []string{"unknown"}, + f: map[string]influxql.DataType{}, + d: map[string]struct{}{}, }, - d: map[string]struct{}{}, - }, - { - sources: []string{"_tagKeys"}, - f: map[string]influxql.DataType{ - "tagKey": influxql.String, - }, - d: map[string]struct{}{}, - }, - { - sources: []string{"_reserved"}, - f: map[string]influxql.DataType{ - "value": influxql.String, - }, - d: map[string]struct{}{ - "region": {}, - }, - }, - { - sources: []string{"unknown"}, - f: map[string]influxql.DataType{}, - d: map[string]struct{}{}, - }, - } { - name := strings.Join(tt.sources, ",") - t.Run(name, func(t *testing.T) { - f, d, err := sh.FieldDimensions(tt.sources) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + } { + name := fmt.Sprintf("%s_%s", index, strings.Join(tt.sources, ",")) + t.Run(name, func(t *testing.T) { + f, d, err := sh.FieldDimensions(tt.sources) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } - if diff := cmp.Diff(tt.f, f, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("unexpected fields:\n%s", diff) - } - if diff := cmp.Diff(tt.d, d, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("unexpected dimensions:\n%s", diff) - } - }) + if diff := cmp.Diff(tt.f, f, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("unexpected fields:\n%s", diff) + } + if diff := cmp.Diff(tt.d, d, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("unexpected dimensions:\n%s", diff) + } + }) + } + shard1.Close() + shard2.Close() } } func TestShards_MapType(t *testing.T) { - shard1 := NewShard() + var shard1, shard2 *Shard - if err := shard1.Open(); err != nil { - t.Fatal(err) - } - defer shard1.Close() + setup := func(index string) { + shard1 = NewShard(index) + if err := shard1.Open(); err != nil { + t.Fatal(err) + } - shard1.MustWritePointsString(` + shard1.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - shard2 := NewShard() + shard2 = NewShard(index) + if err := shard2.Open(); err != nil { + t.Fatal(err) + } - if err := shard2.Open(); err != nil { - t.Fatal(err) - } - - shard2.MustWritePointsString(` + shard2.MustWritePointsString(` mem,host=serverA value=25i 0 mem,host=serverB value=50i,val3=t 10 _reserved,region=uswest value="foo" 0 `) + } - sh := tsdb.Shards([]*tsdb.Shard{shard1.Shard, shard2.Shard}) - for _, tt := range []struct { - measurement string - field string - typ influxql.DataType - }{ - { - measurement: "cpu", - field: "value", - typ: influxql.Float, - }, - { - measurement: "cpu", - field: "host", - typ: influxql.Tag, - }, - { - measurement: "cpu", - field: "region", - typ: influxql.Tag, - }, - { - measurement: "cpu", - field: "val2", - typ: influxql.Float, - }, - { - measurement: "cpu", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "mem", - field: "value", - typ: influxql.Integer, - }, - { - measurement: "mem", - field: "val3", - typ: influxql.Boolean, - }, - { - measurement: "mem", - field: "host", - typ: influxql.Tag, - }, - { - measurement: "unknown", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_fieldKeys", - field: "fieldKey", - typ: influxql.String, - }, - { - measurement: "_fieldKeys", - field: "fieldType", - typ: influxql.String, - }, - { - measurement: "_fieldKeys", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_series", - field: "key", - typ: influxql.String, - }, - { - measurement: "_series", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_tagKeys", - field: "tagKey", - typ: influxql.String, - }, - { - measurement: "_tagKeys", - field: "unknown", - typ: influxql.Unknown, - }, - { - measurement: "_reserved", - field: "value", - typ: influxql.String, - }, - { - measurement: "_reserved", - field: "region", - typ: influxql.Tag, - }, - } { - name := fmt.Sprintf("%s - %s", tt.measurement, tt.field) - t.Run(name, func(t *testing.T) { - typ := sh.MapType(tt.measurement, tt.field) - if have, want := typ, tt.typ; have != want { - t.Errorf("unexpected data type: have=%#v want=%#v", have, want) - } - }) + for _, index := range tsdb.RegisteredIndexes() { + setup(index) + sh := tsdb.Shards([]*tsdb.Shard{shard1.Shard, shard2.Shard}) + for _, tt := range []struct { + measurement string + field string + typ influxql.DataType + }{ + { + measurement: "cpu", + field: "value", + typ: influxql.Float, + }, + { + measurement: "cpu", + field: "host", + typ: influxql.Tag, + }, + { + measurement: "cpu", + field: "region", + typ: influxql.Tag, + }, + { + measurement: "cpu", + field: "val2", + typ: influxql.Float, + }, + { + measurement: "cpu", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "mem", + field: "value", + typ: influxql.Integer, + }, + { + measurement: "mem", + field: "val3", + typ: influxql.Boolean, + }, + { + measurement: "mem", + field: "host", + typ: influxql.Tag, + }, + { + measurement: "unknown", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_fieldKeys", + field: "fieldKey", + typ: influxql.String, + }, + { + measurement: "_fieldKeys", + field: "fieldType", + typ: influxql.String, + }, + { + measurement: "_fieldKeys", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_series", + field: "key", + typ: influxql.String, + }, + { + measurement: "_series", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_tagKeys", + field: "tagKey", + typ: influxql.String, + }, + { + measurement: "_tagKeys", + field: "unknown", + typ: influxql.Unknown, + }, + { + measurement: "_reserved", + field: "value", + typ: influxql.String, + }, + { + measurement: "_reserved", + field: "region", + typ: influxql.Tag, + }, + } { + name := fmt.Sprintf("%s_%s_%s", index, tt.measurement, tt.field) + t.Run(name, func(t *testing.T) { + typ := sh.MapType(tt.measurement, tt.field) + if have, want := typ, tt.typ; have != want { + t.Errorf("unexpected data type: have=%#v want=%#v", have, want) + } + }) + } + shard1.Close() + shard2.Close() } } func TestShards_MeasurementsByRegex(t *testing.T) { - shard1 := NewShard() + var shard1, shard2 *Shard - if err := shard1.Open(); err != nil { - t.Fatal(err) - } - defer shard1.Close() + setup := func(index string) { + shard1 = NewShard(index) + if err := shard1.Open(); err != nil { + t.Fatal(err) + } - shard1.MustWritePointsString(` + shard1.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - shard2 := NewShard() + shard2 = NewShard(index) + if err := shard2.Open(); err != nil { + t.Fatal(err) + } - if err := shard2.Open(); err != nil { - t.Fatal(err) - } - - shard2.MustWritePointsString(` + shard2.MustWritePointsString(` mem,host=serverA value=25i 0 mem,host=serverB value=50i,val3=t 10 _reserved,region=uswest value="foo" 0 `) + } - sh := tsdb.Shards([]*tsdb.Shard{shard1.Shard, shard2.Shard}) - for _, tt := range []struct { - regex string - measurements []string - }{ - {regex: `cpu`, measurements: []string{"cpu"}}, - {regex: `mem`, measurements: []string{"mem"}}, - {regex: `cpu|mem`, measurements: []string{"cpu", "mem"}}, - {regex: `gpu`, measurements: []string{}}, - {regex: `pu`, measurements: []string{"cpu"}}, - {regex: `p|m`, measurements: []string{"cpu", "mem"}}, - } { - t.Run(tt.regex, func(t *testing.T) { - re := regexp.MustCompile(tt.regex) - measurements := sh.MeasurementsByRegex(re) - sort.Strings(measurements) - if diff := cmp.Diff(tt.measurements, measurements, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("unexpected measurements:\n%s", diff) - } - }) + for _, index := range tsdb.RegisteredIndexes() { + setup(index) + sh := tsdb.Shards([]*tsdb.Shard{shard1.Shard, shard2.Shard}) + for _, tt := range []struct { + regex string + measurements []string + }{ + {regex: `cpu`, measurements: []string{"cpu"}}, + {regex: `mem`, measurements: []string{"mem"}}, + {regex: `cpu|mem`, measurements: []string{"cpu", "mem"}}, + {regex: `gpu`, measurements: []string{}}, + {regex: `pu`, measurements: []string{"cpu"}}, + {regex: `p|m`, measurements: []string{"cpu", "mem"}}, + } { + t.Run(tt.regex, func(t *testing.T) { + re := regexp.MustCompile(tt.regex) + measurements := sh.MeasurementsByRegex(re) + sort.Strings(measurements) + if diff := cmp.Diff(tt.measurements, measurements, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("unexpected measurements:\n%s", diff) + } + }) + } + shard1.Close() + shard2.Close() } } @@ -1636,7 +1709,7 @@ type Shard struct { } // NewShard returns a new instance of Shard with temp paths. -func NewShard() *Shard { +func NewShard(index string) *Shard { // Create temporary path for data and WAL. dir, err := ioutil.TempDir("", "influxdb-tsdb-") if err != nil { @@ -1645,8 +1718,11 @@ func NewShard() *Shard { // Build engine options. opt := tsdb.NewEngineOptions() + opt.IndexVersion = index opt.Config.WALDir = filepath.Join(dir, "wal") - opt.InmemIndex = inmem.NewIndex(path.Base(dir)) + if index == "inmem" { + opt.InmemIndex = inmem.NewIndex(path.Base(dir)) + } return &Shard{ Shard: tsdb.NewShard(0,