Run most tests for both indexes
parent
f64f6c5d34
commit
bca4393494
|
@ -29,69 +29,75 @@ import (
|
|||
func TestStore_DeleteRetentionPolicy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Create a new shard under the same retention policy, and verify
|
||||
// that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(2); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Create a new shard under a different retention policy, and
|
||||
// verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp1", 3, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(3); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Deleting the rp0 retention policy does not return an error.
|
||||
if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// It deletes the shards under that retention policy.
|
||||
if sh := s.Shard(1); sh != nil {
|
||||
t.Errorf("shard 1 was not deleted")
|
||||
}
|
||||
|
||||
if sh := s.Shard(2); sh != nil {
|
||||
t.Errorf("shard 2 was not deleted")
|
||||
}
|
||||
|
||||
// It deletes the retention policy directory.
|
||||
if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp {
|
||||
t.Error("directory exists, but should have been removed")
|
||||
}
|
||||
|
||||
// It deletes the WAL retention policy directory.
|
||||
if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp {
|
||||
t.Error("directory exists, but should have been removed")
|
||||
}
|
||||
|
||||
// Reopen other shard and check it still exists.
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Error(err)
|
||||
} else if sh := s.Shard(3); sh == nil {
|
||||
t.Errorf("shard 3 does not exist")
|
||||
}
|
||||
|
||||
// It does not delete other retention policy directories.
|
||||
if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp {
|
||||
t.Error("directory does not exist, but should")
|
||||
}
|
||||
if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp {
|
||||
t.Error("directory does not exist, but should")
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new shard under the same retention policy, and verify
|
||||
// that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(2); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Create a new shard under a different retention policy, and
|
||||
// verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp1", 3, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(3); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Deleting the rp0 retention policy does not return an error.
|
||||
if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// It deletes the shards under that retention policy.
|
||||
if sh := s.Shard(1); sh != nil {
|
||||
t.Errorf("shard 1 was not deleted")
|
||||
}
|
||||
|
||||
if sh := s.Shard(2); sh != nil {
|
||||
t.Errorf("shard 2 was not deleted")
|
||||
}
|
||||
|
||||
// It deletes the retention policy directory.
|
||||
if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp {
|
||||
t.Error("directory exists, but should have been removed")
|
||||
}
|
||||
|
||||
// It deletes the WAL retention policy directory.
|
||||
if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp {
|
||||
t.Error("directory exists, but should have been removed")
|
||||
}
|
||||
|
||||
// Reopen other shard and check it still exists.
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Error(err)
|
||||
} else if sh := s.Shard(3); sh == nil {
|
||||
t.Errorf("shard 3 does not exist")
|
||||
}
|
||||
|
||||
// It does not delete other retention policy directories.
|
||||
if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp {
|
||||
t.Error("directory does not exist, but should")
|
||||
}
|
||||
if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp {
|
||||
t.Error("directory does not exist, but should")
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,30 +105,36 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
|
|||
func TestStore_CreateShard(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Create another shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(2); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard(1)")
|
||||
} else if sh = s.Shard(2); sh == nil {
|
||||
t.Fatalf("expected shard(2)")
|
||||
}
|
||||
}
|
||||
|
||||
// Create another shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(2); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard(1)")
|
||||
} else if sh = s.Shard(2); sh == nil {
|
||||
t.Fatalf("expected shard(2)")
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,21 +142,27 @@ func TestStore_CreateShard(t *testing.T) {
|
|||
func TestStore_DeleteShard(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("shard exists")
|
||||
}
|
||||
}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("shard exists")
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -152,58 +170,71 @@ func TestStore_DeleteShard(t *testing.T) {
|
|||
func TestStore_CreateShardSnapShot(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
// Create a new shard and verify that it exists.
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
dir, e := s.CreateShardSnapshot(1)
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if dir == "" {
|
||||
t.Fatal("empty directory name")
|
||||
}
|
||||
}
|
||||
|
||||
dir, e := s.CreateShardSnapshot(1)
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if dir == "" {
|
||||
t.Fatal("empty directory name")
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_Open(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := NewStore()
|
||||
s.EngineOptions.IndexVersion = index
|
||||
defer s.Close()
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store should ignore shard since it does not have a numeric name.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 2 {
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
} else if n := s.ShardN(); n != 3 {
|
||||
t.Fatalf("unexpected shard count: %d", n)
|
||||
}
|
||||
|
||||
expDatabases := []string{"db0", "db1"}
|
||||
gotDatabases := s.Databases()
|
||||
sort.Strings(gotDatabases)
|
||||
|
||||
if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %#v, expected %#v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store should ignore shard since it does not have a numeric name.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 2 {
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
} else if n := s.ShardN(); n != 3 {
|
||||
t.Fatalf("unexpected shard count: %d", n)
|
||||
}
|
||||
|
||||
expDatabases := []string{"db0", "db1"}
|
||||
gotDatabases := s.Databases()
|
||||
sort.Strings(gotDatabases)
|
||||
|
||||
if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %#v, expected %#v", got, exp)
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -211,19 +242,26 @@ func TestStore_Open(t *testing.T) {
|
|||
func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := NewStore()
|
||||
s.EngineOptions.IndexVersion = index
|
||||
defer s.Close()
|
||||
|
||||
// Create a file instead of a directory for a database.
|
||||
if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
|
||||
t.Fatal(err)
|
||||
// Create a file instead of a directory for a database.
|
||||
if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store should ignore database since it's a file.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 0 {
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
// Store should ignore database since it's a file.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 0 {
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,23 +269,30 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
|
|||
func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := NewStore()
|
||||
s.EngineOptions.IndexVersion = index
|
||||
defer s.Close()
|
||||
|
||||
// Create an RP file instead of a directory.
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0")); err != nil {
|
||||
t.Fatal(err)
|
||||
// Create an RP file instead of a directory.
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store should ignore retention policy since it's a file, and there should
|
||||
// be no indices created.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 0 {
|
||||
t.Log(s.Databases())
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
// Store should ignore retention policy since it's a file, and there should
|
||||
// be no indices created.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 0 {
|
||||
t.Log(s.Databases())
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,23 +300,30 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
|
|||
func TestStore_Open_InvalidShard(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := NewStore()
|
||||
s.EngineOptions.IndexVersion = index
|
||||
defer s.Close()
|
||||
|
||||
// Create a non-numeric shard file.
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0", "bad_shard")); err != nil {
|
||||
t.Fatal(err)
|
||||
// Create a non-numeric shard file.
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0", "bad_shard")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Store should ignore shard since it does not have a numeric name.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 0 {
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
} else if n := s.ShardN(); n != 0 {
|
||||
t.Fatalf("unexpected shard count: %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
// Store should ignore shard since it does not have a numeric name.
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n := len(s.Databases()); n != 0 {
|
||||
t.Fatalf("unexpected database index count: %d", n)
|
||||
} else if n := s.ShardN(); n != 0 {
|
||||
t.Fatalf("unexpected shard count: %d", n)
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,179 +331,202 @@ func TestStore_Open_InvalidShard(t *testing.T) {
|
|||
func TestShards_CreateIterator(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard #0 with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 0,
|
||||
`cpu,host=serverA value=1 0`,
|
||||
`cpu,host=serverA value=2 10`,
|
||||
`cpu,host=serverB value=3 20`,
|
||||
)
|
||||
// Create shard #0 with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 0,
|
||||
`cpu,host=serverA value=1 0`,
|
||||
`cpu,host=serverA value=2 10`,
|
||||
`cpu,host=serverB value=3 20`,
|
||||
)
|
||||
|
||||
// Create shard #1 with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 1,
|
||||
`cpu,host=serverA value=1 30`,
|
||||
`mem,host=serverA value=2 40`, // skip: wrong source
|
||||
`cpu,host=serverC value=3 60`,
|
||||
)
|
||||
// Create shard #1 with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 1,
|
||||
`cpu,host=serverA value=1 30`,
|
||||
`mem,host=serverA value=2 40`, // skip: wrong source
|
||||
`cpu,host=serverC value=3 60`,
|
||||
)
|
||||
|
||||
// Retrieve shard group.
|
||||
shards := s.ShardGroup([]uint64{0, 1})
|
||||
// Retrieve shard group.
|
||||
shards := s.ShardGroup([]uint64{0, 1})
|
||||
|
||||
// Create iterator.
|
||||
itr, err := shards.CreateIterator("cpu", query.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Dimensions: []string{"host"},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer itr.Close()
|
||||
fitr := itr.(query.FloatIterator)
|
||||
// Create iterator.
|
||||
itr, err := shards.CreateIterator("cpu", query.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Dimensions: []string{"host"},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer itr.Close()
|
||||
fitr := itr.(query.FloatIterator)
|
||||
|
||||
// Read values from iterator. The host=serverA points should come first.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(0): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(1): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(2): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(30, 0).UnixNano(), Value: 1}) {
|
||||
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
|
||||
// Read values from iterator. The host=serverA points should come first.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(0): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(1): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(2): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(30, 0).UnixNano(), Value: 1}) {
|
||||
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
// Next the host=serverB point.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(3): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverB"), Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
|
||||
t.Fatalf("unexpected point(3): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
// And finally the host=serverC point.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(4): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverC"), Time: time.Unix(60, 0).UnixNano(), Value: 3}) {
|
||||
t.Fatalf("unexpected point(4): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
// Then an EOF should occur.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("expected eof, got error: %s", err)
|
||||
} else if p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Next the host=serverB point.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(3): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverB"), Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
|
||||
t.Fatalf("unexpected point(3): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
// And finally the host=serverC point.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("unexpected error(4): %s", err)
|
||||
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverC"), Time: time.Unix(60, 0).UnixNano(), Value: 3}) {
|
||||
t.Fatalf("unexpected point(4): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
// Then an EOF should occur.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
t.Fatalf("expected eof, got error: %s", err)
|
||||
} else if p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the store can backup a shard and another store can restore it.
|
||||
func TestStore_BackupRestoreShard(t *testing.T) {
|
||||
t.Parallel()
|
||||
// t.Parallel()
|
||||
|
||||
s0, s1 := MustOpenStore(), MustOpenStore()
|
||||
defer s0.Close()
|
||||
defer s1.Close()
|
||||
test := func(index string) {
|
||||
s0, s1 := MustOpenStore(index), MustOpenStore(index)
|
||||
defer s0.Close()
|
||||
defer s1.Close()
|
||||
|
||||
// Create shard with data.
|
||||
s0.MustCreateShardWithData("db0", "rp0", 100,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
`cpu value=3 20`,
|
||||
)
|
||||
// Create shard with data.
|
||||
s0.MustCreateShardWithData("db0", "rp0", 100,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
`cpu value=3 20`,
|
||||
)
|
||||
|
||||
if err := s0.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
if err := s0.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Backup shard to a buffer.
|
||||
var buf bytes.Buffer
|
||||
if err := s0.BackupShard(100, time.Time{}, &buf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create the shard on the other store and restore from buffer.
|
||||
if err := s1.CreateShard("db0", "rp0", 100, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s1.RestoreShard(100, &buf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Read data from
|
||||
itr, err := s0.Shard(100).CreateIterator("cpu", query.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fitr := itr.(query.FloatIterator)
|
||||
|
||||
// Read values from iterator. The host=serverA points should come first.
|
||||
p, e := fitr.Next()
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
p, e = fitr.Next()
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
p, e = fitr.Next()
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
|
||||
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Backup shard to a buffer.
|
||||
var buf bytes.Buffer
|
||||
if err := s0.BackupShard(100, time.Time{}, &buf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create the shard on the other store and restore from buffer.
|
||||
if err := s1.CreateShard("db0", "rp0", 100, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s1.RestoreShard(100, &buf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Read data from
|
||||
itr, err := s0.Shard(100).CreateIterator("cpu", query.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fitr := itr.(query.FloatIterator)
|
||||
|
||||
// Read values from iterator. The host=serverA points should come first.
|
||||
p, e := fitr.Next()
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
p, e = fitr.Next()
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
p, e = fitr.Next()
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
|
||||
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) {
|
||||
if index == "tsi1" {
|
||||
t.Skip("Skipping failing test for tsi1")
|
||||
}
|
||||
test(index)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 1,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
`cpu value=3 20`,
|
||||
)
|
||||
// Create shard with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 1,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
`cpu value=3 20`,
|
||||
)
|
||||
|
||||
// Create 2nd shard w/ same measurements.
|
||||
s.MustCreateShardWithData("db0", "rp0", 2,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
`cpu value=3 20`,
|
||||
)
|
||||
// Create 2nd shard w/ same measurements.
|
||||
s.MustCreateShardWithData("db0", "rp0", 2,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
`cpu value=3 20`,
|
||||
)
|
||||
|
||||
meas, err := s.MeasurementNames("db0", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error with MeasurementNames: %v", err)
|
||||
meas, err := s.MeasurementNames("db0", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error with MeasurementNames: %v", err)
|
||||
}
|
||||
|
||||
if exp, got := 1, len(meas); exp != got {
|
||||
t.Fatalf("measurement len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if exp, got := "cpu", string(meas[0]); exp != got {
|
||||
t.Fatalf("measurement name mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
if exp, got := 1, len(meas); exp != got {
|
||||
t.Fatalf("measurement len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if exp, got := "cpu", string(meas[0]); exp != got {
|
||||
t.Fatalf("measurement name mismatch: exp %v, got %v", exp, got)
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -862,8 +937,7 @@ func TestStore_TagValues(t *testing.T) {
|
|||
|
||||
var s *Store
|
||||
setup := func(index string) {
|
||||
s = MustOpenStore()
|
||||
s.EngineOptions.IndexVersion = index
|
||||
s = MustOpenStore(index)
|
||||
|
||||
fmtStr := `cpu%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
|
||||
cpu%[1]d,host=nofoo value=1 %[4]d
|
||||
|
@ -975,7 +1049,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(
|
|||
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
|
||||
var path string
|
||||
if err := func() error {
|
||||
store := MustOpenStore()
|
||||
store := MustOpenStore("inmem")
|
||||
defer store.Store.Close()
|
||||
path = store.Path()
|
||||
|
||||
|
@ -1168,10 +1242,11 @@ func NewStore() *Store {
|
|||
return s
|
||||
}
|
||||
|
||||
// MustOpenStore returns a new, open Store using the default index,
|
||||
// MustOpenStore returns a new, open Store using the specified index,
|
||||
// at a temporary path.
|
||||
func MustOpenStore() *Store {
|
||||
func MustOpenStore(index string) *Store {
|
||||
s := NewStore()
|
||||
s.EngineOptions.IndexVersion = index
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue