influxdb/services/meta/client_test.go

777 lines
18 KiB
Go
Raw Normal View History

package meta_test
import (
"encoding/json"
"io/ioutil"
"net"
"os"
"path"
"runtime"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)
func TestMetaClient_CreateDatabaseOnly(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if db, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if db.Name != "db0" {
t.Fatalf("database name mismatch. exp: db0, got %s", db.Name)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
// Make sure a default retention policy was created.
rp, err := c.RetentionPolicy("db0", "default")
if err != nil {
t.Fatal(err)
} else if rp == nil {
t.Fatal("failed to create rp")
} else if exp, got := "default", rp.Name; exp != got {
t.Fatalf("rp name wrong:\n\texp: %s\n\tgot: %s", exp, got)
}
}
func TestMetaClient_CreateDatabaseIfNotExists(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
}
func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
Duration: 1 * time.Hour,
ReplicaN: 1,
}); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
rp := db.RetentionPolicy("rp0")
if err != nil {
t.Fatal(err)
} else if rp.Name != "rp0" {
t.Fatalf("rp name wrong: %s", rp.Name)
} else if rp.Duration != time.Hour {
t.Fatalf("rp duration wrong: %s", rp.Duration.String())
} else if rp.ReplicaN != 1 {
t.Fatalf("rp replication wrong: %d", rp.ReplicaN)
}
}
func TestMetaClient_Databases(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
// Create two databases.
db, err := c.CreateDatabase("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
db, err = c.CreateDatabase("db1")
if err != nil {
t.Fatal(err)
} else if db.Name != "db1" {
t.Fatalf("db name wrong: %s", db.Name)
}
dbs, err := c.Databases()
if err != nil {
t.Fatal(err)
}
if len(dbs) != 2 {
t.Fatalf("expected 2 databases but got %d", len(dbs))
} else if dbs[0].Name != "db0" {
t.Fatalf("db name wrong: %s", dbs[0].Name)
} else if dbs[1].Name != "db1" {
t.Fatalf("db name wrong: %s", dbs[1].Name)
}
}
func TestMetaClient_DropDatabase(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatalf("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
if err := c.DropDatabase("db0"); err != nil {
t.Fatal(err)
}
if db, _ = c.Database("db0"); db != nil {
t.Fatalf("expected database to not return: %v", db)
}
// Dropping a database that does not exist is not an error.
if err := c.DropDatabase("db foo"); err != nil {
t.Fatalf("got %v error, but expected no error", err)
}
}
func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
if _, err := c.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
Duration: 1 * time.Hour,
ReplicaN: 1,
}); err != nil {
t.Fatal(err)
}
rp, err := c.RetentionPolicy("db0", "rp0")
if err != nil {
t.Fatal(err)
} else if rp.Name != "rp0" {
t.Fatalf("rp name wrong: %s", rp.Name)
} else if rp.Duration != time.Hour {
t.Fatalf("rp duration wrong: %s", rp.Duration.String())
} else if rp.ReplicaN != 1 {
t.Fatalf("rp replication wrong: %d", rp.ReplicaN)
}
// Create the same policy. Should not error.
if _, err := c.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
Duration: 1 * time.Hour,
ReplicaN: 1,
}); err != nil {
t.Fatal(err)
}
rp, err = c.RetentionPolicy("db0", "rp0")
if err != nil {
t.Fatal(err)
} else if rp.Name != "rp0" {
t.Fatalf("rp name wrong: %s", rp.Name)
} else if rp.Duration != time.Hour {
t.Fatalf("rp duration wrong: %s", rp.Duration.String())
} else if rp.ReplicaN != 1 {
t.Fatalf("rp replication wrong: %d", rp.ReplicaN)
}
}
func TestMetaClient_SetDefaultRetentionPolicy(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
Duration: 1 * time.Hour,
ReplicaN: 1,
}); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("datbase not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
rp, err := c.RetentionPolicy("db0", "rp0")
if err != nil {
t.Fatal(err)
} else if rp.Name != "rp0" {
t.Fatalf("rp name wrong: %s", rp.Name)
} else if rp.Duration != time.Hour {
t.Fatalf("rp duration wrong: %s", rp.Duration.String())
} else if rp.ReplicaN != 1 {
t.Fatalf("rp replication wrong: %d", rp.ReplicaN)
}
// Make sure default retention policy is now rp0
if exp, got := "rp0", db.DefaultRetentionPolicy; exp != got {
t.Fatalf("rp name wrong: \n\texp: %s\n\tgot: %s", exp, db.DefaultRetentionPolicy)
}
}
func TestMetaClient_DropRetentionPolicy(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
if _, err := c.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
Duration: 1 * time.Hour,
ReplicaN: 1,
}); err != nil {
t.Fatal(err)
}
rp, err := c.RetentionPolicy("db0", "rp0")
if err != nil {
t.Fatal(err)
} else if rp.Name != "rp0" {
t.Fatalf("rp name wrong: %s", rp.Name)
} else if rp.Duration != time.Hour {
t.Fatalf("rp duration wrong: %s", rp.Duration.String())
} else if rp.ReplicaN != 1 {
t.Fatalf("rp replication wrong: %d", rp.ReplicaN)
}
if err := c.DropRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
rp, err = c.RetentionPolicy("db0", "rp0")
if err != nil {
t.Fatal(err)
} else if rp != nil {
t.Fatalf("rp should have been dropped")
}
}
func TestMetaClient_CreateUser(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
// Create an admin user
if _, err := c.CreateUser("fred", "supersecure", true); err != nil {
t.Fatal(err)
}
// Create a non-admin user
if _, err := c.CreateUser("wilma", "password", false); err != nil {
t.Fatal(err)
}
u, err := c.User("fred")
if err != nil {
t.Fatal(err)
}
if exp, got := "fred", u.Name; exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if !u.Admin {
t.Fatalf("expected user to be admin")
}
u, err = c.Authenticate("fred", "supersecure")
if u == nil || err != nil || u.Name != "fred" {
t.Fatalf("failed to authenticate")
}
// Auth for bad password should fail
u, err = c.Authenticate("fred", "badpassword")
if u != nil || err != meta.ErrAuthenticate {
t.Fatalf("authentication should fail with %s", meta.ErrAuthenticate)
}
// Auth for no password should fail
u, err = c.Authenticate("fred", "")
if u != nil || err != meta.ErrAuthenticate {
t.Fatalf("authentication should fail with %s", meta.ErrAuthenticate)
}
// Change password should succeed.
if err := c.UpdateUser("fred", "moresupersecure"); err != nil {
t.Fatal(err)
}
// Auth for old password should fail
u, err = c.Authenticate("fred", "supersecure")
if u != nil || err != meta.ErrAuthenticate {
t.Fatalf("authentication should fail with %s", meta.ErrAuthenticate)
}
// Auth for new password should succeed.
u, err = c.Authenticate("fred", "moresupersecure")
if u == nil || err != nil || u.Name != "fred" {
t.Fatalf("failed to authenticate")
}
// Auth for unkonwn user should fail
u, err = c.Authenticate("foo", "")
if u != nil || err != meta.ErrUserNotFound {
t.Fatalf("authentication should fail with %s", meta.ErrUserNotFound)
}
u, err = c.User("wilma")
if err != nil {
t.Fatal(err)
}
if exp, got := "wilma", u.Name; exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if u.Admin {
t.Fatalf("expected user not to be an admin")
}
if exp, got := 2, c.UserCount(); exp != got {
t.Fatalf("unexpected user count. got: %d exp: %d", got, exp)
}
// Grant privilidges to a non-admin user
if err := c.SetAdminPrivilege("wilma", true); err != nil {
t.Fatal(err)
}
u, err = c.User("wilma")
if err != nil {
t.Fatal(err)
}
if exp, got := "wilma", u.Name; exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if !u.Admin {
t.Fatalf("expected user to be an admin")
}
// Revoke privilidges from user
if err := c.SetAdminPrivilege("wilma", false); err != nil {
t.Fatal(err)
}
u, err = c.User("wilma")
if err != nil {
t.Fatal(err)
}
if exp, got := "wilma", u.Name; exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if u.Admin {
t.Fatalf("expected user not to be an admin")
}
// Create a database to use for assiging privileges to.
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
// Assign a single privilege at the database level
if err := c.SetPrivilege("wilma", "db0", influxql.ReadPrivilege); err != nil {
t.Fatal(err)
}
p, err := c.UserPrivilege("wilma", "db0")
if err != nil {
t.Fatal(err)
}
if p == nil {
t.Fatal("expected privilege but was nil")
}
if exp, got := influxql.ReadPrivilege, *p; exp != got {
t.Fatalf("unexpected privilege. exp: %d, got: %d", exp, got)
}
// Remove a single privilege at the database level
if err := c.SetPrivilege("wilma", "db0", influxql.NoPrivileges); err != nil {
t.Fatal(err)
}
p, err = c.UserPrivilege("wilma", "db0")
if err != nil {
t.Fatal(err)
}
if p == nil {
t.Fatal("expected privilege but was nil")
}
if exp, got := influxql.NoPrivileges, *p; exp != got {
t.Fatalf("unexpected privilege. exp: %d, got: %d", exp, got)
}
// Drop a user
if err := c.DropUser("wilma"); err != nil {
t.Fatal(err)
}
u, err = c.User("wilma")
if err != meta.ErrUserNotFound {
t.Fatalf("user lookup should fail with %s", meta.ErrUserNotFound)
}
if exp, got := 1, c.UserCount(); exp != got {
t.Fatalf("unexpected user count. got: %d exp: %d", got, exp)
}
}
func TestMetaClient_ContinuousQueries(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
// Create a database to use
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatalf("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
// Create a CQ
if err := c.CreateContinuousQuery("db0", "cq0", `SELECT count(value) INTO foo_count FROM foo GROUP BY time(10m)`); err != nil {
t.Fatal(err)
}
// Recreate an existing CQ
if err := c.CreateContinuousQuery("db0", "cq0", `SELECT max(value) INTO foo_max FROM foo GROUP BY time(10m)`); err == nil || err.Error() != `continuous query already exists` {
t.Fatalf("unexpected error: %s", err)
}
// Create a few more CQ's
if err := c.CreateContinuousQuery("db0", "cq1", `SELECT max(value) INTO foo_max FROM foo GROUP BY time(10m)`); err != nil {
t.Fatal(err)
}
if err := c.CreateContinuousQuery("db0", "cq2", `SELECT min(value) INTO foo_min FROM foo GROUP BY time(10m)`); err != nil {
t.Fatal(err)
}
// Drop a single CQ
if err := c.DropContinuousQuery("db0", "cq1"); err != nil {
t.Fatal(err)
}
}
func TestMetaClient_Subscriptions_Create(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
// Create a database to use
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db == nil {
t.Fatal("database not found")
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
// Create a subscription
if err := c.CreateSubscription("db0", "default", "sub0", "ALL", []string{"udp://example.com:9090"}); err != nil {
t.Fatal(err)
}
// Re-create a subscription
if err := c.CreateSubscription("db0", "default", "sub0", "ALL", []string{"udp://example.com:9090"}); err == nil || err.Error() != `subscription already exists` {
t.Fatalf("unexpected error: %s", err)
}
// Create another subscription.
if err := c.CreateSubscription("db0", "default", "sub1", "ALL", []string{"udp://example.com:6060"}); err != nil {
t.Fatal(err)
}
}
func TestMetaClient_Subscriptions_Drop(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
// Create a database to use
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
// DROP SUBSCRIPTION returns ErrSubscriptionNotFound when the
// subscription is unknown.
err := c.DropSubscription("db0", "default", "foo")
if got, exp := err, meta.ErrSubscriptionNotFound; got == nil || got.Error() != exp.Error() {
t.Fatalf("got: %s, exp: %s", got, exp)
}
// Create a subscription.
if err := c.CreateSubscription("db0", "default", "sub0", "ALL", []string{"udp://example.com:9090"}); err != nil {
t.Fatal(err)
}
// DROP SUBSCRIPTION returns an influxdb.ErrDatabaseNotFound when
// the database is unknown.
err = c.DropSubscription("foo", "default", "sub0")
if got, exp := err, influxdb.ErrDatabaseNotFound("foo"); got.Error() != exp.Error() {
t.Fatalf("got: %s, exp: %s", got, exp)
}
// DROP SUBSCRIPTION returns an influxdb.ErrRetentionPolicyNotFound
// when the retention policy is unknown.
err = c.DropSubscription("db0", "foo_policy", "sub0")
if got, exp := err, influxdb.ErrRetentionPolicyNotFound("foo_policy"); got.Error() != exp.Error() {
t.Fatalf("got: %s, exp: %s", got, exp)
}
// DROP SUBSCRIPTION drops the subsciption if it can find it.
err = c.DropSubscription("db0", "default", "sub0")
if got := err; got != nil {
t.Fatalf("got: %s, exp: %v", got, nil)
}
}
func TestMetaClient_Shards(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
// Test creating a shard group.
tmin := time.Now()
sg, err := c.CreateShardGroup("db0", "default", tmin)
if err != nil {
t.Fatal(err)
} else if sg == nil {
t.Fatalf("expected ShardGroup")
}
// Test pre-creating shard groups.
dur := sg.EndTime.Sub(sg.StartTime) + time.Nanosecond
tmax := tmin.Add(dur)
if err := c.PrecreateShardGroups(tmin, tmax); err != nil {
t.Fatal(err)
}
// Test finding shard groups by time range.
groups, err := c.ShardGroupsByTimeRange("db0", "default", tmin, tmax)
if err != nil {
t.Fatal(err)
} else if len(groups) != 2 {
t.Fatalf("wrong number of shard groups: %d", len(groups))
}
// Test finding shard owner.
db, rp, owner := c.ShardOwner(groups[0].Shards[0].ID)
if db != "db0" {
t.Fatalf("wrong db name: %s", db)
} else if rp != "default" {
t.Fatalf("wrong rp name: %s", rp)
} else if owner.ID != groups[0].ID {
t.Fatalf("wrong owner: exp %d got %d", groups[0].ID, owner.ID)
}
// Test deleting a shard group.
if err := c.DeleteShardGroup("db0", "default", groups[0].ID); err != nil {
t.Fatal(err)
} else if groups, err = c.ShardGroupsByTimeRange("db0", "default", tmin, tmax); err != nil {
t.Fatal(err)
} else if len(groups) != 1 {
t.Fatalf("wrong number of shard groups after delete: %d", len(groups))
}
}
func TestMetaClient_PersistClusterIDAfterRestart(t *testing.T) {
t.Parallel()
cfg := newConfig()
defer os.RemoveAll(cfg.Dir)
c := meta.NewClient(cfg)
if err := c.Open(); err != nil {
t.Fatal(err)
}
id := c.ClusterID()
if id == 0 {
t.Fatal("cluster ID can't be zero")
}
c = meta.NewClient(cfg)
if err := c.Open(); err != nil {
t.Fatal(err)
}
defer c.Close()
idAfter := c.ClusterID()
if idAfter == 0 {
t.Fatal("cluster ID can't be zero")
} else if idAfter != id {
t.Fatalf("cluster id not the same: %d, %d", idAfter, id)
}
}
func newClient() (string, *meta.Client) {
cfg := newConfig()
c := meta.NewClient(cfg)
if err := c.Open(); err != nil {
panic(err)
}
return cfg.Dir, c
}
func newConfig() *meta.Config {
cfg := meta.NewConfig()
cfg.Dir = testTempDir(2)
return cfg
}
func testTempDir(skip int) string {
// Get name of the calling function.
pc, _, _, ok := runtime.Caller(skip)
if !ok {
panic("failed to get name of test function")
}
_, prefix := path.Split(runtime.FuncForPC(pc).Name())
// Make a temp dir prefixed with calling function's name.
dir, err := ioutil.TempDir(os.TempDir(), prefix)
if err != nil {
panic(err)
}
return dir
}
func mustParseStatement(s string) influxql.Statement {
stmt, err := influxql.ParseStatement(s)
if err != nil {
panic(err)
}
return stmt
}
func mustMarshalJSON(v interface{}) string {
b, e := json.Marshal(v)
if e != nil {
panic(e)
}
return string(b)
}
func freePort() string {
l, _ := net.Listen("tcp", "127.0.0.1:0")
defer l.Close()
return l.Addr().String()
}
func freePorts(i int) []string {
var ports []string
for j := 0; j < i; j++ {
ports = append(ports, freePort())
}
return ports
}