From bca43934941fe055ed7acccc4ccad30b007e26ee Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 4 Aug 2017 17:27:40 +0100 Subject: [PATCH] Run most tests for both indexes --- tsdb/store_test.go | 717 +++++++++++++++++++++++++-------------------- 1 file changed, 396 insertions(+), 321 deletions(-) diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 4144a2fc2d..3507358820 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -29,69 +29,75 @@ import ( func TestStore_DeleteRetentionPolicy(t *testing.T) { t.Parallel() - s := MustOpenStore() - defer s.Close() + test := func(index string) { + s := MustOpenStore(index) + defer s.Close() - // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("expected shard") + // Create a new shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard") + } + + // Create a new shard under the same retention policy, and verify + // that it exists. + if err := s.CreateShard("db0", "rp0", 2, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(2); sh == nil { + t.Fatalf("expected shard") + } + + // Create a new shard under a different retention policy, and + // verify that it exists. + if err := s.CreateShard("db0", "rp1", 3, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(3); sh == nil { + t.Fatalf("expected shard") + } + + // Deleting the rp0 retention policy does not return an error. + if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + // It deletes the shards under that retention policy. + if sh := s.Shard(1); sh != nil { + t.Errorf("shard 1 was not deleted") + } + + if sh := s.Shard(2); sh != nil { + t.Errorf("shard 2 was not deleted") + } + + // It deletes the retention policy directory. + if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp { + t.Error("directory exists, but should have been removed") + } + + // It deletes the WAL retention policy directory. + if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp { + t.Error("directory exists, but should have been removed") + } + + // Reopen other shard and check it still exists. + if err := s.Reopen(); err != nil { + t.Error(err) + } else if sh := s.Shard(3); sh == nil { + t.Errorf("shard 3 does not exist") + } + + // It does not delete other retention policy directories. + if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp { + t.Error("directory does not exist, but should") + } + if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp { + t.Error("directory does not exist, but should") + } } - // Create a new shard under the same retention policy, and verify - // that it exists. - if err := s.CreateShard("db0", "rp0", 2, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(2); sh == nil { - t.Fatalf("expected shard") - } - - // Create a new shard under a different retention policy, and - // verify that it exists. - if err := s.CreateShard("db0", "rp1", 3, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(3); sh == nil { - t.Fatalf("expected shard") - } - - // Deleting the rp0 retention policy does not return an error. - if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil { - t.Fatal(err) - } - - // It deletes the shards under that retention policy. - if sh := s.Shard(1); sh != nil { - t.Errorf("shard 1 was not deleted") - } - - if sh := s.Shard(2); sh != nil { - t.Errorf("shard 2 was not deleted") - } - - // It deletes the retention policy directory. - if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp { - t.Error("directory exists, but should have been removed") - } - - // It deletes the WAL retention policy directory. - if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp { - t.Error("directory exists, but should have been removed") - } - - // Reopen other shard and check it still exists. - if err := s.Reopen(); err != nil { - t.Error(err) - } else if sh := s.Shard(3); sh == nil { - t.Errorf("shard 3 does not exist") - } - - // It does not delete other retention policy directories. - if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp { - t.Error("directory does not exist, but should") - } - if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp { - t.Error("directory does not exist, but should") + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -99,30 +105,36 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { func TestStore_CreateShard(t *testing.T) { t.Parallel() - s := MustOpenStore() - defer s.Close() + test := func(index string) { + s := MustOpenStore(index) + defer s.Close() - // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("expected shard") + // Create a new shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard") + } + + // Create another shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 2, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(2); sh == nil { + t.Fatalf("expected shard") + } + + // Reopen shard and recheck. + if err := s.Reopen(); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard(1)") + } else if sh = s.Shard(2); sh == nil { + t.Fatalf("expected shard(2)") + } } - // Create another shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 2, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(2); sh == nil { - t.Fatalf("expected shard") - } - - // Reopen shard and recheck. - if err := s.Reopen(); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("expected shard(1)") - } else if sh = s.Shard(2); sh == nil { - t.Fatalf("expected shard(2)") + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -130,21 +142,27 @@ func TestStore_CreateShard(t *testing.T) { func TestStore_DeleteShard(t *testing.T) { t.Parallel() - s := MustOpenStore() - defer s.Close() + test := func(index string) { + s := MustOpenStore(index) + defer s.Close() - // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("expected shard") + // Create a new shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard") + } + + // Reopen shard and recheck. + if err := s.Reopen(); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("shard exists") + } } - // Reopen shard and recheck. - if err := s.Reopen(); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("shard exists") + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -152,58 +170,71 @@ func TestStore_DeleteShard(t *testing.T) { func TestStore_CreateShardSnapShot(t *testing.T) { t.Parallel() - s := MustOpenStore() - defer s.Close() + test := func(index string) { + s := MustOpenStore(index) + defer s.Close() - // Create a new shard and verify that it exists. - if err := s.CreateShard("db0", "rp0", 1, true); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("expected shard") + // Create a new shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard") + } + + dir, e := s.CreateShardSnapshot(1) + if e != nil { + t.Fatal(e) + } + if dir == "" { + t.Fatal("empty directory name") + } } - dir, e := s.CreateShardSnapshot(1) - if e != nil { - t.Fatal(e) - } - if dir == "" { - t.Fatal("empty directory name") + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } func TestStore_Open(t *testing.T) { t.Parallel() - s := NewStore() - defer s.Close() + test := func(index string) { + s := NewStore() + s.EngineOptions.IndexVersion = index + defer s.Close() - if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil { - t.Fatal(err) + if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil { + t.Fatal(err) + } + + // Store should ignore shard since it does not have a numeric name. + if err := s.Open(); err != nil { + t.Fatal(err) + } else if n := len(s.Databases()); n != 2 { + t.Fatalf("unexpected database index count: %d", n) + } else if n := s.ShardN(); n != 3 { + t.Fatalf("unexpected shard count: %d", n) + } + + expDatabases := []string{"db0", "db1"} + gotDatabases := s.Databases() + sort.Strings(gotDatabases) + + if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) { + t.Fatalf("got %#v, expected %#v", got, exp) + } } - if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil { - t.Fatal(err) - } - - if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil { - t.Fatal(err) - } - - // Store should ignore shard since it does not have a numeric name. - if err := s.Open(); err != nil { - t.Fatal(err) - } else if n := len(s.Databases()); n != 2 { - t.Fatalf("unexpected database index count: %d", n) - } else if n := s.ShardN(); n != 3 { - t.Fatalf("unexpected shard count: %d", n) - } - - expDatabases := []string{"db0", "db1"} - gotDatabases := s.Databases() - sort.Strings(gotDatabases) - - if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) { - t.Fatalf("got %#v, expected %#v", got, exp) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -211,19 +242,26 @@ func TestStore_Open(t *testing.T) { func TestStore_Open_InvalidDatabaseFile(t *testing.T) { t.Parallel() - s := NewStore() - defer s.Close() + test := func(index string) { + s := NewStore() + s.EngineOptions.IndexVersion = index + defer s.Close() - // Create a file instead of a directory for a database. - if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil { - t.Fatal(err) + // Create a file instead of a directory for a database. + if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil { + t.Fatal(err) + } + + // Store should ignore database since it's a file. + if err := s.Open(); err != nil { + t.Fatal(err) + } else if n := len(s.Databases()); n != 0 { + t.Fatalf("unexpected database index count: %d", n) + } } - // Store should ignore database since it's a file. - if err := s.Open(); err != nil { - t.Fatal(err) - } else if n := len(s.Databases()); n != 0 { - t.Fatalf("unexpected database index count: %d", n) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -231,23 +269,30 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) { func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { t.Parallel() - s := NewStore() - defer s.Close() + test := func(index string) { + s := NewStore() + s.EngineOptions.IndexVersion = index + defer s.Close() - // Create an RP file instead of a directory. - if err := os.MkdirAll(filepath.Join(s.Path(), "db0"), 0777); err != nil { - t.Fatal(err) - } else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0")); err != nil { - t.Fatal(err) + // Create an RP file instead of a directory. + if err := os.MkdirAll(filepath.Join(s.Path(), "db0"), 0777); err != nil { + t.Fatal(err) + } else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0")); err != nil { + t.Fatal(err) + } + + // Store should ignore retention policy since it's a file, and there should + // be no indices created. + if err := s.Open(); err != nil { + t.Fatal(err) + } else if n := len(s.Databases()); n != 0 { + t.Log(s.Databases()) + t.Fatalf("unexpected database index count: %d", n) + } } - // Store should ignore retention policy since it's a file, and there should - // be no indices created. - if err := s.Open(); err != nil { - t.Fatal(err) - } else if n := len(s.Databases()); n != 0 { - t.Log(s.Databases()) - t.Fatalf("unexpected database index count: %d", n) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -255,23 +300,30 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { func TestStore_Open_InvalidShard(t *testing.T) { t.Parallel() - s := NewStore() - defer s.Close() + test := func(index string) { + s := NewStore() + s.EngineOptions.IndexVersion = index + defer s.Close() - // Create a non-numeric shard file. - if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0"), 0777); err != nil { - t.Fatal(err) - } else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0", "bad_shard")); err != nil { - t.Fatal(err) + // Create a non-numeric shard file. + if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0"), 0777); err != nil { + t.Fatal(err) + } else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0", "bad_shard")); err != nil { + t.Fatal(err) + } + + // Store should ignore shard since it does not have a numeric name. + if err := s.Open(); err != nil { + t.Fatal(err) + } else if n := len(s.Databases()); n != 0 { + t.Fatalf("unexpected database index count: %d", n) + } else if n := s.ShardN(); n != 0 { + t.Fatalf("unexpected shard count: %d", n) + } } - // Store should ignore shard since it does not have a numeric name. - if err := s.Open(); err != nil { - t.Fatal(err) - } else if n := len(s.Databases()); n != 0 { - t.Fatalf("unexpected database index count: %d", n) - } else if n := s.ShardN(); n != 0 { - t.Fatalf("unexpected shard count: %d", n) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -279,179 +331,202 @@ func TestStore_Open_InvalidShard(t *testing.T) { func TestShards_CreateIterator(t *testing.T) { t.Parallel() - s := MustOpenStore() - defer s.Close() + test := func(index string) { + s := MustOpenStore(index) + defer s.Close() - // Create shard #0 with data. - s.MustCreateShardWithData("db0", "rp0", 0, - `cpu,host=serverA value=1 0`, - `cpu,host=serverA value=2 10`, - `cpu,host=serverB value=3 20`, - ) + // Create shard #0 with data. + s.MustCreateShardWithData("db0", "rp0", 0, + `cpu,host=serverA value=1 0`, + `cpu,host=serverA value=2 10`, + `cpu,host=serverB value=3 20`, + ) - // Create shard #1 with data. - s.MustCreateShardWithData("db0", "rp0", 1, - `cpu,host=serverA value=1 30`, - `mem,host=serverA value=2 40`, // skip: wrong source - `cpu,host=serverC value=3 60`, - ) + // Create shard #1 with data. + s.MustCreateShardWithData("db0", "rp0", 1, + `cpu,host=serverA value=1 30`, + `mem,host=serverA value=2 40`, // skip: wrong source + `cpu,host=serverC value=3 60`, + ) - // Retrieve shard group. - shards := s.ShardGroup([]uint64{0, 1}) + // Retrieve shard group. + shards := s.ShardGroup([]uint64{0, 1}) - // Create iterator. - itr, err := shards.CreateIterator("cpu", query.IteratorOptions{ - Expr: influxql.MustParseExpr(`value`), - Dimensions: []string{"host"}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - }) - if err != nil { - t.Fatal(err) - } - defer itr.Close() - fitr := itr.(query.FloatIterator) + // Create iterator. + itr, err := shards.CreateIterator("cpu", query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Dimensions: []string{"host"}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + }) + if err != nil { + t.Fatal(err) + } + defer itr.Close() + fitr := itr.(query.FloatIterator) - // Read values from iterator. The host=serverA points should come first. - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(0, 0).UnixNano(), Value: 1}) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(10, 0).UnixNano(), Value: 2}) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(2): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(30, 0).UnixNano(), Value: 1}) { - t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + // Read values from iterator. The host=serverA points should come first. + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(0): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(0, 0).UnixNano(), Value: 1}) { + t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + } + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(1): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(10, 0).UnixNano(), Value: 2}) { + t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) + } + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(2): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(30, 0).UnixNano(), Value: 1}) { + t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + } + + // Next the host=serverB point. + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(3): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverB"), Time: time.Unix(20, 0).UnixNano(), Value: 3}) { + t.Fatalf("unexpected point(3): %s", spew.Sdump(p)) + } + + // And finally the host=serverC point. + if p, err := fitr.Next(); err != nil { + t.Fatalf("unexpected error(4): %s", err) + } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverC"), Time: time.Unix(60, 0).UnixNano(), Value: 3}) { + t.Fatalf("unexpected point(4): %s", spew.Sdump(p)) + } + + // Then an EOF should occur. + if p, err := fitr.Next(); err != nil { + t.Fatalf("expected eof, got error: %s", err) + } else if p != nil { + t.Fatalf("expected eof, got: %s", spew.Sdump(p)) + } } - // Next the host=serverB point. - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(3): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverB"), Time: time.Unix(20, 0).UnixNano(), Value: 3}) { - t.Fatalf("unexpected point(3): %s", spew.Sdump(p)) - } - - // And finally the host=serverC point. - if p, err := fitr.Next(); err != nil { - t.Fatalf("unexpected error(4): %s", err) - } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverC"), Time: time.Unix(60, 0).UnixNano(), Value: 3}) { - t.Fatalf("unexpected point(4): %s", spew.Sdump(p)) - } - - // Then an EOF should occur. - if p, err := fitr.Next(); err != nil { - t.Fatalf("expected eof, got error: %s", err) - } else if p != nil { - t.Fatalf("expected eof, got: %s", spew.Sdump(p)) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } // Ensure the store can backup a shard and another store can restore it. func TestStore_BackupRestoreShard(t *testing.T) { - t.Parallel() + // t.Parallel() - s0, s1 := MustOpenStore(), MustOpenStore() - defer s0.Close() - defer s1.Close() + test := func(index string) { + s0, s1 := MustOpenStore(index), MustOpenStore(index) + defer s0.Close() + defer s1.Close() - // Create shard with data. - s0.MustCreateShardWithData("db0", "rp0", 100, - `cpu value=1 0`, - `cpu value=2 10`, - `cpu value=3 20`, - ) + // Create shard with data. + s0.MustCreateShardWithData("db0", "rp0", 100, + `cpu value=1 0`, + `cpu value=2 10`, + `cpu value=3 20`, + ) - if err := s0.Reopen(); err != nil { - t.Fatal(err) + if err := s0.Reopen(); err != nil { + t.Fatal(err) + } + + // Backup shard to a buffer. + var buf bytes.Buffer + if err := s0.BackupShard(100, time.Time{}, &buf); err != nil { + t.Fatal(err) + } + + // Create the shard on the other store and restore from buffer. + if err := s1.CreateShard("db0", "rp0", 100, true); err != nil { + t.Fatal(err) + } + if err := s1.RestoreShard(100, &buf); err != nil { + t.Fatal(err) + } + + // Read data from + itr, err := s0.Shard(100).CreateIterator("cpu", query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + }) + if err != nil { + t.Fatal(err) + } + fitr := itr.(query.FloatIterator) + + // Read values from iterator. The host=serverA points should come first. + p, e := fitr.Next() + if e != nil { + t.Fatal(e) + } + if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) { + t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + } + p, e = fitr.Next() + if e != nil { + t.Fatal(e) + } + if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) { + t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) + } + p, e = fitr.Next() + if e != nil { + t.Fatal(e) + } + if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) { + t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + } } - // Backup shard to a buffer. - var buf bytes.Buffer - if err := s0.BackupShard(100, time.Time{}, &buf); err != nil { - t.Fatal(err) - } - - // Create the shard on the other store and restore from buffer. - if err := s1.CreateShard("db0", "rp0", 100, true); err != nil { - t.Fatal(err) - } - if err := s1.RestoreShard(100, &buf); err != nil { - t.Fatal(err) - } - - // Read data from - itr, err := s0.Shard(100).CreateIterator("cpu", query.IteratorOptions{ - Expr: influxql.MustParseExpr(`value`), - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - }) - if err != nil { - t.Fatal(err) - } - fitr := itr.(query.FloatIterator) - - // Read values from iterator. The host=serverA points should come first. - p, e := fitr.Next() - if e != nil { - t.Fatal(e) - } - if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - p, e = fitr.Next() - if e != nil { - t.Fatal(e) - } - if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - p, e = fitr.Next() - if e != nil { - t.Fatal(e) - } - if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) { - t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + if index == "tsi1" { + t.Skip("Skipping failing test for tsi1") + } + test(index) + }) } } func TestStore_MeasurementNames_Deduplicate(t *testing.T) { t.Parallel() - s := MustOpenStore() - defer s.Close() + test := func(index string) { + s := MustOpenStore(index) + defer s.Close() - // Create shard with data. - s.MustCreateShardWithData("db0", "rp0", 1, - `cpu value=1 0`, - `cpu value=2 10`, - `cpu value=3 20`, - ) + // Create shard with data. + s.MustCreateShardWithData("db0", "rp0", 1, + `cpu value=1 0`, + `cpu value=2 10`, + `cpu value=3 20`, + ) - // Create 2nd shard w/ same measurements. - s.MustCreateShardWithData("db0", "rp0", 2, - `cpu value=1 0`, - `cpu value=2 10`, - `cpu value=3 20`, - ) + // Create 2nd shard w/ same measurements. + s.MustCreateShardWithData("db0", "rp0", 2, + `cpu value=1 0`, + `cpu value=2 10`, + `cpu value=3 20`, + ) - meas, err := s.MeasurementNames("db0", nil) - if err != nil { - t.Fatalf("unexpected error with MeasurementNames: %v", err) + meas, err := s.MeasurementNames("db0", nil) + if err != nil { + t.Fatalf("unexpected error with MeasurementNames: %v", err) + } + + if exp, got := 1, len(meas); exp != got { + t.Fatalf("measurement len mismatch: exp %v, got %v", exp, got) + } + + if exp, got := "cpu", string(meas[0]); exp != got { + t.Fatalf("measurement name mismatch: exp %v, got %v", exp, got) + } } - if exp, got := 1, len(meas); exp != got { - t.Fatalf("measurement len mismatch: exp %v, got %v", exp, got) - } - - if exp, got := "cpu", string(meas[0]); exp != got { - t.Fatalf("measurement name mismatch: exp %v, got %v", exp, got) + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) } } @@ -862,8 +937,7 @@ func TestStore_TagValues(t *testing.T) { var s *Store setup := func(index string) { - s = MustOpenStore() - s.EngineOptions.IndexVersion = index + s = MustOpenStore(index) fmtStr := `cpu%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d cpu%[1]d,host=nofoo value=1 %[4]d @@ -975,7 +1049,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen( func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) { var path string if err := func() error { - store := MustOpenStore() + store := MustOpenStore("inmem") defer store.Store.Close() path = store.Path() @@ -1168,10 +1242,11 @@ func NewStore() *Store { return s } -// MustOpenStore returns a new, open Store using the default index, +// MustOpenStore returns a new, open Store using the specified index, // at a temporary path. -func MustOpenStore() *Store { +func MustOpenStore(index string) *Store { s := NewStore() + s.EngineOptions.IndexVersion = index if err := s.Open(); err != nil { panic(err) }