From 76d291e88e31405dea92e3378ed07e9d7cc0dc8c Mon Sep 17 00:00:00 2001 From: David Norton Date: Thu, 19 Nov 2015 17:07:22 -0500 Subject: [PATCH 01/15] beginning of cluster test harness --- cmd/influxd/run/server.go | 1 + cmd/influxd/run/server_cluster_test.go | 221 +++++++++++++++++++++++++ cmd/influxd/run/server_helpers_test.go | 188 +++++++++++++++++++++ 3 files changed, 410 insertions(+) create mode 100644 cmd/influxd/run/server_cluster_test.go diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 46f939a676..76d5d6867c 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -363,6 +363,7 @@ func (s *Server) Open() error { // The port 0 is used, we need to retrieve the port assigned by the kernel if strings.HasSuffix(s.BindAddress, ":0") { s.MetaStore.Addr = ln.Addr() + s.MetaStore.RemoteAddr = ln.Addr() } // Multiplex listener. diff --git a/cmd/influxd/run/server_cluster_test.go b/cmd/influxd/run/server_cluster_test.go new file mode 100644 index 0000000000..bbb58dd5b9 --- /dev/null +++ b/cmd/influxd/run/server_cluster_test.go @@ -0,0 +1,221 @@ +package run_test + +import ( + "fmt" + "net/url" + "strings" + "testing" + "time" +) + +func TestCluster_CreateDatabase(t *testing.T) { + t.Parallel() + + c, err := NewClusterWithDefaults(5) + defer c.Close() + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } +} + +func TestCluster_Write(t *testing.T) { + t.Parallel() + + c, err := NewClusterWithDefaults(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + writes := []string{ + fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + } + + _, err = c.Servers[0].Write("db", "default", strings.Join(writes, "\n"), nil) + if err != nil { + t.Fatal(err) + } + + q := &Query{ + name: "write", + command: `SELECT * FROM db."default".cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, + } + err = c.QueryAll(q) + if err != nil { + t.Fatal(err) + } +} +func TestCluster_DatabaseCommands(t *testing.T) { + t.Parallel() + c, err := NewCluster(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + + defer c.Close() + + test := Test{ + queries: []*Query{ + &Query{ + name: "create database should succeed", + command: `CREATE DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "create database should error with bad name", + command: `CREATE DATABASE 0xdb0`, + exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`, + }, + &Query{ + name: "show database should succeed", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`, + }, + &Query{ + name: "create database should error if it already exists", + command: `CREATE DATABASE db0`, + exp: `{"results":[{"error":"database already exists"}]}`, + }, + &Query{ + name: "create database should not error with existing database with IF NOT EXISTS", + command: `CREATE DATABASE IF NOT EXISTS db0`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create database should create non-existing database with IF NOT EXISTS", + command: `CREATE DATABASE IF NOT EXISTS db1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "show database should succeed", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`, + }, + &Query{ + name: "drop database db0 should succeed", + command: `DROP DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "drop database db1 should succeed", + command: `DROP DATABASE db1`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "drop database should error if it does not exists", + command: `DROP DATABASE db1`, + exp: `{"results":[{"error":"database not found: db1"}]}`, + }, + &Query{ + name: "drop database should not error with non-existing database db1 WITH IF EXISTS", + command: `DROP DATABASE IF EXISTS db1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "show database should have no results", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`, + }, + &Query{ + name: "drop database should error if it doesn't exist", + command: `DROP DATABASE db0`, + exp: `{"results":[{"error":"database not found: db0"}]}`, + }, + }, + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } +} + +func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) { + t.Parallel() + c, err := NewClusterWithDefaults(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + writes := []string{ + fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + } + + _, err = c.Servers[0].Write("db", "default", strings.Join(writes, "\n"), nil) + if err != nil { + t.Fatal(err) + } + test := Test{ + queries: []*Query{ + &Query{ + name: "Drop database after data write", + command: `DROP DATABASE db`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "Recreate database", + command: `CREATE DATABASE db`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "Recreate retention policy", + command: `CREATE RETENTION POLICY rp0 ON db DURATION 365d REPLICATION 1 DEFAULT`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "Show measurements after recreate", + command: `SHOW MEASUREMENTS`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db"}}, + }, + &Query{ + name: "Query data after recreate", + command: `SELECT * FROM cpu`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db"}}, + }, + }, + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } + +} diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 620a48f847..c99ac1fda0 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -14,15 +14,19 @@ import ( "os" "regexp" "strings" + "sync" "testing" "time" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/influxdb/cmd/influxd/run" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/toml" ) +const emptyResults = `{"results":[{}]}` + // Server represents a test wrapper for run.Server. type Server struct { *run.Server @@ -48,6 +52,9 @@ func NewServer(c *run.Config) *Server { // OpenServer opens a test server. func OpenServer(c *run.Config, joinURLs string) *Server { + if len(joinURLs) > 0 { + c.Meta.Peers = strings.Split(joinURLs, ",") + } s := NewServer(c) configureLogging(s) if err := s.Open(); err != nil { @@ -294,6 +301,7 @@ type Query struct { pattern bool skip bool repeat int + once bool } // Execute runs the command and returns an err if it fails @@ -375,3 +383,183 @@ func configureLogging(s *Server) { } } } + +type Cluster struct { + Servers []*Server +} + +func NewCluster(size int) (*Cluster, error) { + c := Cluster{} + c.Servers = append(c.Servers, OpenServer(NewConfig(), "")) + raftURL := c.Servers[0].MetaStore.Addr.String() + + for i := 1; i < size; i++ { + c.Servers = append(c.Servers, OpenServer(NewConfig(), raftURL)) + } + + for _, s := range c.Servers { + configureLogging(s) + } + + r, err := c.Servers[0].Query("SHOW SERVERS") + if err != nil { + return nil, err + } + var cl client.Response + if e := json.Unmarshal([]byte(r), &cl); e != nil { + return nil, e + } + + var leaderCount int + var raftCount int + + for _, result := range cl.Results { + for _, series := range result.Series { + for i, value := range series.Values { + addr := c.Servers[i].MetaStore.Addr.String() + if value[0].(float64) != float64(i+1) { + return nil, fmt.Errorf("expected nodeID %d, got %v", i, value[0]) + } + if value[1].(string) != addr { + return nil, fmt.Errorf("expected addr %s, got %v", addr, value[1]) + } + if value[2].(bool) { + raftCount++ + } + if value[3].(bool) { + leaderCount++ + } + } + } + } + if leaderCount != 1 { + return nil, fmt.Errorf("expected 1 leader, got %d", leaderCount) + } + if size < 3 && raftCount != size { + return nil, fmt.Errorf("expected %d raft nodes, got %d", size, raftCount) + } + if size >= 3 && raftCount != 3 { + return nil, fmt.Errorf("expected 3 raft nodes, got %d", raftCount) + } + return &c, nil +} + +func NewClusterWithDefaults(size int) (*Cluster, error) { + c, err := NewCluster(size) + if err != nil { + return nil, err + } + + r, err := c.Query(&Query{command: "CREATE DATABASE db"}) + if err != nil { + return nil, err + } + if r != emptyResults { + return nil, fmt.Errorf("%s", r) + } + + for i, s := range c.Servers { + got, err := s.Query("SHOW DATABASES") + if err != nil { + return nil, fmt.Errorf("failed to query databases on node %d for show databases", i+1) + } + if exp := `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db"]]}]}]}`; got != exp { + return nil, fmt.Errorf("unexpected result node %d\nexp: %s\ngot: %s\n", i+1, exp, got) + } + } + + return c, nil +} + +// Close shuts down all servers. +func (c *Cluster) Close() { + var wg sync.WaitGroup + wg.Add(len(c.Servers)) + + for _, s := range c.Servers { + go func(s *Server) { + defer wg.Done() + s.Close() + }(s) + } + wg.Wait() +} + +func (c *Cluster) Query(q *Query) (string, error) { + r, e := c.Servers[0].Query(q.command) + q.act = r + return r, e +} + +func (c *Cluster) QueryIndex(index int, q string) (string, error) { + return c.Servers[index].Query(q) +} + +func (c *Cluster) QueryAll(q *Query) error { + type Response struct { + Val string + Err error + } + + timeoutErr := fmt.Errorf("timed out waiting for response") + timeout := time.After(20 * time.Second) + + queryAll := func() error { + ch := make(chan Response, 0) + + for _, s := range c.Servers { + go func(s *Server) { + r, err := s.QueryWithParams(q.command, q.params) + ch <- Response{Val: r, Err: err} + }(s) + } + + resps := []Response{} + for i := 0; i < len(c.Servers); i++ { + select { + case r := <-ch: + resps = append(resps, r) + case <-timeout: + return timeoutErr + } + } + + for _, r := range resps { + if r.Err != nil { + return r.Err + } + if q.pattern { + if !expectPattern(q.exp, r.Val) { + return fmt.Errorf("unexpected pattern: %s\n\texp: %s\n\tgot: %s\n", q.pattern, q.exp, r.Val) + } + } else { + if r.Val != q.exp { + return fmt.Errorf("unexpected value:\n\texp: %s\n\tgot: %s\n", q.exp, r.Val) + } + } + } + + return nil + } + + tick := time.Tick(100 * time.Millisecond) + if err := queryAll(); err == nil { + return nil + } else if err != timeoutErr { + return err + } + for { + select { + case <-tick: + if err := queryAll(); err == nil { + return nil + } else if err != timeoutErr { + return err + } + case <-timeout: + return fmt.Errorf("timed out waiting for response") + } + } + + return nil +} From 343f00d1f2fd07d4184a29498275f9cef71ffce2 Mon Sep 17 00:00:00 2001 From: David Norton Date: Thu, 19 Nov 2015 17:14:53 -0500 Subject: [PATCH 02/15] fix go vet errors --- cmd/influxd/run/server_helpers_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index c99ac1fda0..876f1bd74a 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -530,7 +530,7 @@ func (c *Cluster) QueryAll(q *Query) error { } if q.pattern { if !expectPattern(q.exp, r.Val) { - return fmt.Errorf("unexpected pattern: %s\n\texp: %s\n\tgot: %s\n", q.pattern, q.exp, r.Val) + return fmt.Errorf("unexpected pattern: \n\texp: %s\n\tgot: %s\n", q.exp, r.Val) } } else { if r.Val != q.exp { @@ -560,6 +560,4 @@ func (c *Cluster) QueryAll(q *Query) error { return fmt.Errorf("timed out waiting for response") } } - - return nil } From 657877dd94d0bde045975048b29e1f5a66e1636a Mon Sep 17 00:00:00 2001 From: David Norton Date: Fri, 20 Nov 2015 10:13:11 -0500 Subject: [PATCH 03/15] make not found err messages more consistent --- meta/data.go | 35 +++++++++++++++++---------------- meta/data_test.go | 13 ++++++++---- meta/errors.go | 6 ------ meta/statement_executor.go | 3 ++- meta/statement_executor_test.go | 4 +++- meta/store.go | 5 +++-- meta/store_test.go | 4 +++- monitor/service.go | 3 ++- 8 files changed, 40 insertions(+), 33 deletions(-) diff --git a/meta/data.go b/meta/data.go index 0d934c561e..1ea08b79c4 100644 --- a/meta/data.go +++ b/meta/data.go @@ -5,6 +5,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta/internal" ) @@ -174,14 +175,14 @@ func (data *Data) DropDatabase(name string) error { return nil } } - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(name) } // RetentionPolicy returns a retention policy for a database by name. func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) { di := data.Database(database) if di == nil { - return nil, ErrDatabaseNotFound + return nil, influxdb.ErrDatabaseNotFound(database) } for i := range di.RetentionPolicies { @@ -205,7 +206,7 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf // Find database. di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } else if di.RetentionPolicy(rpi.Name) != nil { return ErrRetentionPolicyExists } @@ -226,7 +227,7 @@ func (data *Data) DropRetentionPolicy(database, name string) error { // Find database. di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } // Prohibit dropping the default retention policy. @@ -241,7 +242,7 @@ func (data *Data) DropRetentionPolicy(database, name string) error { return nil } } - return ErrRetentionPolicyNotFound + return influxdb.ErrRetentionPolicyNotFound(name) } // UpdateRetentionPolicy updates an existing retention policy. @@ -249,13 +250,13 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol // Find database. di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } // Find policy. rpi := di.RetentionPolicy(name) if rpi == nil { - return ErrRetentionPolicyNotFound + return influxdb.ErrRetentionPolicyNotFound(name) } // Ensure new policy doesn't match an existing policy. @@ -288,9 +289,9 @@ func (data *Data) SetDefaultRetentionPolicy(database, name string) error { // Find database and verify policy exists. di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } else if di.RetentionPolicy(name) == nil { - return ErrRetentionPolicyNotFound + return influxdb.ErrRetentionPolicyNotFound(name) } // Set default policy. @@ -306,7 +307,7 @@ func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) if err != nil { return nil, err } else if rpi == nil { - return nil, ErrRetentionPolicyNotFound + return nil, influxdb.ErrRetentionPolicyNotFound(database) } groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) for _, g := range rpi.ShardGroups { @@ -326,7 +327,7 @@ func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax tim if err != nil { return nil, err } else if rpi == nil { - return nil, ErrRetentionPolicyNotFound + return nil, influxdb.ErrRetentionPolicyNotFound(database) } groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) for _, g := range rpi.ShardGroups { @@ -345,7 +346,7 @@ func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time. if err != nil { return nil, err } else if rpi == nil { - return nil, ErrRetentionPolicyNotFound + return nil, influxdb.ErrRetentionPolicyNotFound(database) } return rpi.ShardGroupByTimestamp(timestamp), nil @@ -363,7 +364,7 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) if err != nil { return err } else if rpi == nil { - return ErrRetentionPolicyNotFound + return influxdb.ErrRetentionPolicyNotFound(database) } // Verify that shard group doesn't already exist for this timestamp. @@ -426,7 +427,7 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error { if err != nil { return err } else if rpi == nil { - return ErrRetentionPolicyNotFound + return influxdb.ErrRetentionPolicyNotFound(policy) } // Find shard group by ID and set its deletion timestamp. @@ -444,7 +445,7 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error { func (data *Data) CreateContinuousQuery(database, name, query string) error { di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } // Ensure the name doesn't already exist. @@ -467,7 +468,7 @@ func (data *Data) CreateContinuousQuery(database, name, query string) error { func (data *Data) DropContinuousQuery(database, name string) error { di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } for i := range di.ContinuousQueries { @@ -486,7 +487,7 @@ func (data *Data) CreateSubscription(database, rp, name, mode string, destinatio return err } if rpi == nil { - return ErrRetentionPolicyNotFound + return influxdb.ErrRetentionPolicyNotFound(rp) } // Ensure the name doesn't already exist. diff --git a/meta/data_test.go b/meta/data_test.go index ed994bbb6c..d2386d8612 100644 --- a/meta/data_test.go +++ b/meta/data_test.go @@ -10,6 +10,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/gogo/protobuf/proto" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/meta/internal" @@ -183,7 +184,8 @@ func TestData_CreateRetentionPolicy_ErrReplicationFactorTooLow(t *testing.T) { // Ensure that creating a retention policy on a non-existent database returns an error. func TestData_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}}} - if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != meta.ErrDatabaseNotFound { + expErr := influxdb.ErrDatabaseNotFound("db0") + if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err.Error() != expErr.Error() { t.Fatalf("unexpected error: %s", err) } } @@ -249,7 +251,8 @@ func TestData_DropRetentionPolicy(t *testing.T) { // Ensure an error is returned when deleting a policy from a non-existent database. func TestData_DropRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { var data meta.Data - if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound { + expErr := influxdb.ErrDatabaseNotFound("db0") + if err := data.DropRetentionPolicy("db0", "rp0"); err.Error() != expErr.Error() { t.Fatal(err) } } @@ -260,7 +263,8 @@ func TestData_DropRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { if err := data.CreateDatabase("db0"); err != nil { t.Fatal(err) } - if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrRetentionPolicyNotFound { + expErr := influxdb.ErrRetentionPolicyNotFound("rp0") + if err := data.DropRetentionPolicy("db0", "rp0"); err.Error() != expErr.Error() { t.Fatal(err) } } @@ -290,7 +294,8 @@ func TestData_RetentionPolicy(t *testing.T) { // Ensure that retrieving a policy from a non-existent database returns an error. func TestData_RetentionPolicy_ErrDatabaseNotFound(t *testing.T) { var data meta.Data - if _, err := data.RetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound { + expErr := influxdb.ErrDatabaseNotFound("db0") + if _, err := data.RetentionPolicy("db0", "rp0"); err.Error() != expErr.Error() { t.Fatal(err) } } diff --git a/meta/errors.go b/meta/errors.go index e32af0552b..66b57a05a2 100644 --- a/meta/errors.go +++ b/meta/errors.go @@ -39,9 +39,6 @@ var ( // ErrDatabaseExists is returned when creating an already existing database. ErrDatabaseExists = newError("database already exists") - // ErrDatabaseNotFound is returned when mutating a database that doesn't exist. - ErrDatabaseNotFound = newError("database not found") - // ErrDatabaseNameRequired is returned when creating a database without a name. ErrDatabaseNameRequired = newError("database name required") ) @@ -54,9 +51,6 @@ var ( // on a default retention policy. ErrRetentionPolicyDefault = newError("retention policy is default") - // ErrRetentionPolicyNotFound is returned when mutating a policy that doesn't exist. - ErrRetentionPolicyNotFound = newError("retention policy not found") - // ErrRetentionPolicyNameRequired is returned when creating a policy without a name. ErrRetentionPolicyNameRequired = newError("retention policy name required") diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 4150f5514f..701a266d13 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/models" ) @@ -292,7 +293,7 @@ func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.Sh if err != nil { return &influxql.Result{Err: err} } else if di == nil { - return &influxql.Result{Err: ErrDatabaseNotFound} + return &influxql.Result{Err: influxdb.ErrDatabaseNotFound(q.Database)} } row := &models.Row{Columns: []string{"name", "duration", "replicaN", "default"}} diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index 52a63a64fb..55d0b25c9a 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/models" @@ -659,7 +660,8 @@ func TestStatementExecutor_ExecuteStatement_ShowRetentionPolicies_ErrDatabaseNot return nil, nil } - if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES ON db0`)); res.Err != meta.ErrDatabaseNotFound { + expErr := influxdb.ErrDatabaseNotFound("db0") + if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES ON db0`)); res.Err.Error() != expErr.Error() { t.Fatalf("unexpected error: %s", res.Err) } } diff --git a/meta/store.go b/meta/store.go index ff1fa6aeef..9cfdf731d8 100644 --- a/meta/store.go +++ b/meta/store.go @@ -21,6 +21,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/hashicorp/raft" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta/internal" "golang.org/x/crypto/bcrypt" @@ -1105,7 +1106,7 @@ func (s *Store) DefaultRetentionPolicy(database string) (rpi *RetentionPolicyInf err = s.read(func(data *Data) error { di := data.Database(database) if di == nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } for i := range di.RetentionPolicies { @@ -1124,7 +1125,7 @@ func (s *Store) RetentionPolicies(database string) (a []RetentionPolicyInfo, err err = s.read(func(data *Data) error { di := data.Database(database) if di != nil { - return ErrDatabaseNotFound + return influxdb.ErrDatabaseNotFound(database) } a = di.RetentionPolicies return nil diff --git a/meta/store_test.go b/meta/store_test.go index 7a56ec3038..7049225950 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tcp" "github.com/influxdb/influxdb/toml" @@ -240,7 +241,8 @@ func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) { s := MustOpenStore() defer s.Close() - if err := s.DropDatabase("no_such_database"); err != meta.ErrDatabaseNotFound { + expErr := influxdb.ErrDatabaseNotFound("no_such_database") + if err := s.DropDatabase("no_such_database"); err.Error() != expErr.Error() { t.Fatalf("unexpected error: %s", err) } } diff --git a/monitor/service.go b/monitor/service.go index 4aeb1d909d..51a8970083 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/models" @@ -329,7 +330,7 @@ func (m *Monitor) createInternalStorage() { return } - if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != meta.ErrRetentionPolicyNotFound { + if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != influxdb.ErrRetentionPolicyNotFound("default") { m.Logger.Printf("failed to delete retention policy 'default', failed to created internal storage: %s", err.Error()) return } From 320856419227ae1d83ba0c50928a7c2824b25ad8 Mon Sep 17 00:00:00 2001 From: David Norton Date: Fri, 20 Nov 2015 11:20:39 -0500 Subject: [PATCH 04/15] fix RP not found error messages --- cmd/influxd/run/server_test.go | 4 ++-- meta/data.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 1476796508..57ea835a32 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -550,7 +550,7 @@ func TestServer_RetentionPolicyCommands(t *testing.T) { &Query{ name: "Check error when deleting retention policy on non-existent database", command: `DROP RETENTION POLICY rp1 ON mydatabase`, - exp: `{"results":[{"error":"database not found"}]}`, + exp: `{"results":[{"error":"database not found: mydatabase"}]}`, }, }, } @@ -1635,7 +1635,7 @@ func TestServer_Query_Common(t *testing.T) { &Query{ name: "selecting a from a non-existent retention policy should error", command: `SELECT value FROM db0.rp1.cpu`, - exp: `{"results":[{"error":"retention policy not found"}]}`, + exp: `{"results":[{"error":"retention policy not found: rp1"}]}`, }, &Query{ name: "selecting a valid measurement and field should succeed", diff --git a/meta/data.go b/meta/data.go index 1ea08b79c4..edbb555b30 100644 --- a/meta/data.go +++ b/meta/data.go @@ -307,7 +307,7 @@ func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) if err != nil { return nil, err } else if rpi == nil { - return nil, influxdb.ErrRetentionPolicyNotFound(database) + return nil, influxdb.ErrRetentionPolicyNotFound(policy) } groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) for _, g := range rpi.ShardGroups { @@ -327,7 +327,7 @@ func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax tim if err != nil { return nil, err } else if rpi == nil { - return nil, influxdb.ErrRetentionPolicyNotFound(database) + return nil, influxdb.ErrRetentionPolicyNotFound(policy) } groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) for _, g := range rpi.ShardGroups { @@ -346,7 +346,7 @@ func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time. if err != nil { return nil, err } else if rpi == nil { - return nil, influxdb.ErrRetentionPolicyNotFound(database) + return nil, influxdb.ErrRetentionPolicyNotFound(policy) } return rpi.ShardGroupByTimestamp(timestamp), nil @@ -364,7 +364,7 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) if err != nil { return err } else if rpi == nil { - return influxdb.ErrRetentionPolicyNotFound(database) + return influxdb.ErrRetentionPolicyNotFound(policy) } // Verify that shard group doesn't already exist for this timestamp. From 35b438f8fa186b85a3e3104bc72ec5b0f7966dcf Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 30 Nov 2015 13:49:35 -0600 Subject: [PATCH 05/15] silence meta/rpc logging for testing --- meta/store.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/meta/store.go b/meta/store.go index 9cfdf731d8..7b4127f5fe 100644 --- a/meta/store.go +++ b/meta/store.go @@ -172,6 +172,14 @@ func NewStore(c *Config) *Store { return s } +// SetLogger sets the internal logger to the logger passed in. +func (s *Store) SetLogger(l *log.Logger) { + s.Logger = l + if s.rpc != nil { + s.rpc.logger = l + } +} + // Path returns the root path when open. // Returns an empty string when the store is closed. func (s *Store) Path() string { return s.path } From cf4e9010785b8bcb1504f9d482c650d297793f06 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 30 Nov 2015 13:49:50 -0600 Subject: [PATCH 06/15] silence registration logging for testing --- services/registration/service.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/registration/service.go b/services/registration/service.go index 587c2e7bf8..cf23350f09 100644 --- a/services/registration/service.go +++ b/services/registration/service.go @@ -85,6 +85,11 @@ func (s *Service) Close() error { return nil } +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { + s.logger = l +} + // Diagnostics returns diagnostics information. func (s *Service) Diagnostics() (*monitor.Diagnostic, error) { diagnostics := map[string]interface{}{ From 967a53cabd6c26cc2ec28814c086a3551f1671b2 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 30 Nov 2015 13:51:03 -0600 Subject: [PATCH 07/15] start sharing integration tests for cluster tests --- cmd/influxd/run/server_cluster_test.go | 166 ++++++------------- cmd/influxd/run/server_helpers_test.go | 50 +++++- cmd/influxd/run/server_suite_test.go | 214 +++++++++++++++++++++++++ cmd/influxd/run/server_test.go | 190 +--------------------- 4 files changed, 316 insertions(+), 304 deletions(-) create mode 100644 cmd/influxd/run/server_suite_test.go diff --git a/cmd/influxd/run/server_cluster_test.go b/cmd/influxd/run/server_cluster_test.go index bbb58dd5b9..b7443e5b59 100644 --- a/cmd/influxd/run/server_cluster_test.go +++ b/cmd/influxd/run/server_cluster_test.go @@ -2,7 +2,6 @@ package run_test import ( "fmt" - "net/url" "strings" "testing" "time" @@ -31,14 +30,14 @@ func TestCluster_Write(t *testing.T) { fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), } - _, err = c.Servers[0].Write("db", "default", strings.Join(writes, "\n"), nil) + _, err = c.Servers[0].Write("db0", "default", strings.Join(writes, "\n"), nil) if err != nil { t.Fatal(err) } q := &Query{ name: "write", - command: `SELECT * FROM db."default".cpu`, + command: `SELECT * FROM db0."default".cpu`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, } err = c.QueryAll(q) @@ -46,6 +45,7 @@ func TestCluster_Write(t *testing.T) { t.Fatal(err) } } + func TestCluster_DatabaseCommands(t *testing.T) { t.Parallel() c, err := NewCluster(5) @@ -55,78 +55,7 @@ func TestCluster_DatabaseCommands(t *testing.T) { defer c.Close() - test := Test{ - queries: []*Query{ - &Query{ - name: "create database should succeed", - command: `CREATE DATABASE db0`, - exp: `{"results":[{}]}`, - once: true, - }, - &Query{ - name: "create database should error with bad name", - command: `CREATE DATABASE 0xdb0`, - exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`, - }, - &Query{ - name: "show database should succeed", - command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`, - }, - &Query{ - name: "create database should error if it already exists", - command: `CREATE DATABASE db0`, - exp: `{"results":[{"error":"database already exists"}]}`, - }, - &Query{ - name: "create database should not error with existing database with IF NOT EXISTS", - command: `CREATE DATABASE IF NOT EXISTS db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create database should create non-existing database with IF NOT EXISTS", - command: `CREATE DATABASE IF NOT EXISTS db1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show database should succeed", - command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`, - }, - &Query{ - name: "drop database db0 should succeed", - command: `DROP DATABASE db0`, - exp: `{"results":[{}]}`, - once: true, - }, - &Query{ - name: "drop database db1 should succeed", - command: `DROP DATABASE db1`, - exp: `{"results":[{}]}`, - once: true, - }, - &Query{ - name: "drop database should error if it does not exists", - command: `DROP DATABASE db1`, - exp: `{"results":[{"error":"database not found: db1"}]}`, - }, - &Query{ - name: "drop database should not error with non-existing database db1 WITH IF EXISTS", - command: `DROP DATABASE IF EXISTS db1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show database should have no results", - command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`, - }, - &Query{ - name: "drop database should error if it doesn't exist", - command: `DROP DATABASE db0`, - exp: `{"results":[{"error":"database not found: db0"}]}`, - }, - }, - } + test := tests.load(t, "database_commands") for _, query := range test.queries { if query.skip { @@ -156,48 +85,58 @@ func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) { } defer c.Close() - writes := []string{ - fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - } + test := tests.load(t, "drop_and_recreate_database") - _, err = c.Servers[0].Write("db", "default", strings.Join(writes, "\n"), nil) + _, err = c.Servers[0].Write(test.database(), test.retentionPolicy(), test.write, nil) + if err != nil { + t.Fatal(err) + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } +} + +func TestCluster_Query_DropDatabaseIsolated(t *testing.T) { + t.Parallel() + c, err := NewCluster(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + test := tests.load(t, "drop_database_isolated") + + s := c.Servers[0] + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp1", 1, 0)); err != nil { + t.Fatal(err) + } + + _, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil) if err != nil { t.Fatal(err) } - test := Test{ - queries: []*Query{ - &Query{ - name: "Drop database after data write", - command: `DROP DATABASE db`, - exp: `{"results":[{}]}`, - once: true, - }, - &Query{ - name: "Recreate database", - command: `CREATE DATABASE db`, - exp: `{"results":[{}]}`, - once: true, - }, - &Query{ - name: "Recreate retention policy", - command: `CREATE RETENTION POLICY rp0 ON db DURATION 365d REPLICATION 1 DEFAULT`, - exp: `{"results":[{}]}`, - once: true, - }, - &Query{ - name: "Show measurements after recreate", - command: `SHOW MEASUREMENTS`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db"}}, - }, - &Query{ - name: "Query data after recreate", - command: `SELECT * FROM cpu`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db"}}, - }, - }, - } for _, query := range test.queries { if query.skip { @@ -217,5 +156,4 @@ func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) { t.Error(query.Error(err)) } } - } diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 876f1bd74a..6ed01b2841 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -329,6 +329,8 @@ func (q *Query) failureMessage() string { return fmt.Sprintf("%s: unexpected results\nquery: %s\nexp: %s\nactual: %s\n", q.name, q.command, q.exp, q.act) } +type Tests map[string]Test + type Test struct { initialized bool write string @@ -346,15 +348,57 @@ func NewTest(db, rp string) Test { } } +func (t Test) duplicate() Test { + test := Test{ + initialized: t.initialized, + write: t.write, + params: t.params, + db: t.db, + rp: t.rp, + exp: t.exp, + queries: make([]*Query, len(t.queries)), + } + copy(test.queries, t.queries) + return test +} + +func (t *Test) addWrite(s ...string) { + if len(t.write) > 0 { + t.write += "\n" + } + t.write = strings.Join(s, "\n") +} + func (t *Test) addQueries(q ...*Query) { t.queries = append(t.queries, q...) } +func (t *Test) database() string { + if t.db != "" { + return t.db + } + return "db0" +} + +func (t *Test) retentionPolicy() string { + if t.rp != "" { + return t.rp + } + return "default" +} + func (t *Test) init(s *Server) error { if t.write == "" || t.initialized { return nil } t.initialized = true + if t.db == "" { + t.db = "db0" + } + if t.rp == "" { + t.rp = "rp0" + } + if res, err := s.Write(t.db, t.rp, t.write, t.params); err != nil { return err } else if t.exp != res { @@ -370,7 +414,7 @@ func configureLogging(s *Server) { SetLogger(*log.Logger) } nullLogger := log.New(ioutil.Discard, "", 0) - s.MetaStore.Logger = nullLogger + s.MetaStore.SetLogger(nullLogger) s.TSDBStore.Logger = nullLogger s.HintedHandoff.SetLogger(nullLogger) s.Monitor.SetLogger(nullLogger) @@ -450,7 +494,7 @@ func NewClusterWithDefaults(size int) (*Cluster, error) { return nil, err } - r, err := c.Query(&Query{command: "CREATE DATABASE db"}) + r, err := c.Query(&Query{command: "CREATE DATABASE db0"}) if err != nil { return nil, err } @@ -463,7 +507,7 @@ func NewClusterWithDefaults(size int) (*Cluster, error) { if err != nil { return nil, fmt.Errorf("failed to query databases on node %d for show databases", i+1) } - if exp := `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db"]]}]}]}`; got != exp { + if exp := `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`; got != exp { return nil, fmt.Errorf("unexpected result node %d\nexp: %s\ngot: %s\n", i+1, exp, got) } } diff --git a/cmd/influxd/run/server_suite_test.go b/cmd/influxd/run/server_suite_test.go new file mode 100644 index 0000000000..be87dfaddf --- /dev/null +++ b/cmd/influxd/run/server_suite_test.go @@ -0,0 +1,214 @@ +package run_test + +import ( + "fmt" + "net/url" + "testing" + "time" +) + +var tests Tests + +// Load all shared tests +func init() { + tests = make(map[string]Test) + + tests["database_commands"] = Test{ + queries: []*Query{ + &Query{ + name: "create database should succeed", + command: `CREATE DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "create database with retention duration should succeed", + command: `CREATE DATABASE db0_r WITH DURATION 24h REPLICATION 2 NAME db0_r_policy`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "create database should error with bad name", + command: `CREATE DATABASE 0xdb0`, + exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`, + }, + &Query{ + name: "create database with retention duration should error with bad retention duration", + command: `CREATE DATABASE db0 WITH DURATION xyz`, + exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 35"}`, + }, + &Query{ + name: "create database with retention replication should error with bad retention replication number", + command: `CREATE DATABASE db0 WITH REPLICATION xyz`, + exp: `{"error":"error parsing query: found xyz, expected number at line 1, char 38"}`, + }, + &Query{ + name: "create database with retention name should error with missing retention name", + command: `CREATE DATABASE db0 WITH NAME`, + exp: `{"error":"error parsing query: found EOF, expected identifier at line 1, char 31"}`, + }, + &Query{ + name: "show database should succeed", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`, + }, + &Query{ + name: "create database should error if it already exists", + command: `CREATE DATABASE db0`, + exp: `{"results":[{"error":"database already exists"}]}`, + }, + &Query{ + name: "create database should error if it already exists", + command: `CREATE DATABASE db0_r`, + exp: `{"results":[{"error":"database already exists"}]}`, + }, + &Query{ + name: "create database should not error with existing database with IF NOT EXISTS", + command: `CREATE DATABASE IF NOT EXISTS db0`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create database should create non-existing database with IF NOT EXISTS", + command: `CREATE DATABASE IF NOT EXISTS db1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create database with retention duration should not error with existing database with IF NOT EXISTS", + command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION 24h`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create database should error IF NOT EXISTS with bad retention duration", + command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION xyz`, + exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 49"}`, + }, + &Query{ + name: "show database should succeed", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`, + }, + &Query{ + name: "drop database db0 should succeed", + command: `DROP DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "drop database db0_r should succeed", + command: `DROP DATABASE db0_r`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "drop database db1 should succeed", + command: `DROP DATABASE db1`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "drop database should error if it does not exists", + command: `DROP DATABASE db1`, + exp: `{"results":[{"error":"database not found: db1"}]}`, + }, + &Query{ + name: "drop database should not error with non-existing database db1 WITH IF EXISTS", + command: `DROP DATABASE IF EXISTS db1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "show database should have no results", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`, + }, + &Query{ + name: "drop database should error if it doesn't exist", + command: `DROP DATABASE db0`, + exp: `{"results":[{"error":"database not found: db0"}]}`, + }, + }, + } + + tests["drop_and_recreate_database"] = Test{ + write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + queries: []*Query{ + &Query{ + name: "Drop database after data write", + command: `DROP DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "Recreate database", + command: `CREATE DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "Recreate retention policy", + command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 365d REPLICATION 1 DEFAULT`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "Show measurements after recreate", + command: `SHOW MEASUREMENTS`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Query data after recreate", + command: `SELECT * FROM cpu`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }, + } + + tests["drop_database_isolated"] = Test{ + db: "db0", + rp: "rp0", + write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + queries: []*Query{ + &Query{ + name: "Query data from 1st database", + command: `SELECT * FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Query data from 1st database with GROUP BY *", + command: `SELECT * FROM cpu GROUP BY *`, + exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop other database", + command: `DROP DATABASE db1`, + once: true, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "Query data from 1st database and ensure it's still there", + command: `SELECT * FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Query data from 1st database and ensure it's still there with GROUP BY *", + command: `SELECT * FROM cpu GROUP BY *`, + exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }, + } + +} + +func (tests Tests) load(t *testing.T, key string) Test { + test, ok := tests[key] + if !ok { + t.Fatalf("no test %q", key) + } + + return test.duplicate() +} diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 57ea835a32..90960097e6 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -31,115 +31,7 @@ func TestServer_DatabaseCommands(t *testing.T) { s := OpenServer(NewConfig(), "") defer s.Close() - test := Test{ - queries: []*Query{ - &Query{ - name: "create database should succeed", - command: `CREATE DATABASE db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create database with retention duration should succeed", - command: `CREATE DATABASE db0_r WITH DURATION 24h REPLICATION 2 NAME db0_r_policy`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create database should error with bad name", - command: `CREATE DATABASE 0xdb0`, - exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`, - }, - &Query{ - name: "create database with retention duration should error with bad retention duration", - command: `CREATE DATABASE db0 WITH DURATION xyz`, - exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 35"}`, - }, - &Query{ - name: "create database with retention replication should error with bad retention replication number", - command: `CREATE DATABASE db0 WITH REPLICATION xyz`, - exp: `{"error":"error parsing query: found xyz, expected number at line 1, char 38"}`, - }, - &Query{ - name: "create database with retention name should error with missing retention name", - command: `CREATE DATABASE db0 WITH NAME`, - exp: `{"error":"error parsing query: found EOF, expected identifier at line 1, char 31"}`, - }, - &Query{ - name: "show database should succeed", - command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`, - }, - &Query{ - name: "create database should error if it already exists", - command: `CREATE DATABASE db0`, - exp: `{"results":[{"error":"database already exists"}]}`, - }, - &Query{ - name: "create database should error if it already exists", - command: `CREATE DATABASE db0_r`, - exp: `{"results":[{"error":"database already exists"}]}`, - }, - &Query{ - name: "create database should not error with existing database with IF NOT EXISTS", - command: `CREATE DATABASE IF NOT EXISTS db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create database should create non-existing database with IF NOT EXISTS", - command: `CREATE DATABASE IF NOT EXISTS db1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create database with retention duration should not error with existing database with IF NOT EXISTS", - command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION 24h`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create database should error IF NOT EXISTS with bad retention duration", - command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION xyz`, - exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 49"}`, - }, - &Query{ - name: "show database should succeed", - command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`, - }, - &Query{ - name: "drop database db0 should succeed", - command: `DROP DATABASE db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "drop database db0_r should succeed", - command: `DROP DATABASE db0_r`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "drop database db1 should succeed", - command: `DROP DATABASE db1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "drop database should error if it does not exists", - command: `DROP DATABASE db1`, - exp: `{"results":[{"error":"database not found: db1"}]}`, - }, - &Query{ - name: "drop database should not error with non-existing database db1 WITH IF EXISTS", - command: `DROP DATABASE IF EXISTS db1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show database should have no results", - command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`, - }, - &Query{ - name: "drop database should error if it doesn't exist", - command: `DROP DATABASE db0`, - exp: `{"results":[{"error":"database not found: db0"}]}`, - }, - }, - } + test := tests.load(t, "database_commands") for _, query := range test.queries { if query.skip { @@ -166,42 +58,7 @@ func TestServer_Query_DropAndRecreateDatabase(t *testing.T) { t.Fatal(err) } - writes := []string{ - fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - } - - test := NewTest("db0", "rp0") - test.write = strings.Join(writes, "\n") - - test.addQueries([]*Query{ - &Query{ - name: "Drop database after data write", - command: `DROP DATABASE db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "Recreate database", - command: `CREATE DATABASE db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "Recreate retention policy", - command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 365d REPLICATION 1 DEFAULT`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "Show measurements after recreate", - command: `SHOW MEASUREMENTS`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Query data after recreate", - command: `SELECT * FROM cpu`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - }...) + test := tests.load(t, "drop_and_recreate_database") for i, query := range test.queries { if i == 0 { @@ -236,44 +93,7 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) { t.Fatal(err) } - writes := []string{ - fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - } - - test := NewTest("db0", "rp0") - test.write = strings.Join(writes, "\n") - - test.addQueries([]*Query{ - &Query{ - name: "Query data from 1st database", - command: `SELECT * FROM cpu`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Query data from 1st database with GROUP BY *", - command: `SELECT * FROM cpu GROUP BY *`, - exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Drop other database", - command: `DROP DATABASE db1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "Query data from 1st database and ensure it's still there", - command: `SELECT * FROM cpu`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Query data from 1st database and ensure it's still there with GROUP BY *", - command: `SELECT * FROM cpu GROUP BY *`, - exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - }...) + test := tests.load(t, "drop_database_isolated") for i, query := range test.queries { if i == 0 { @@ -3313,10 +3133,6 @@ func TestServer_Query_TopInt(t *testing.T) { continue } - println(">>>>", query.name) - if query.name != `top - memory - host tag with limit 2` { // FIXME: temporary - continue - } if err := query.Execute(s); err != nil { t.Error(query.Error(err)) } else if !query.success() { From 6e845839c8fe31a93fe727743b2723ab8f6f8cfa Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 1 Dec 2015 09:08:16 -0600 Subject: [PATCH 08/15] fix flakey test harness --- cmd/influxd/run/server_helpers_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 6ed01b2841..d8d40f7a91 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -546,9 +546,10 @@ func (c *Cluster) QueryAll(q *Query) error { } timeoutErr := fmt.Errorf("timed out waiting for response") - timeout := time.After(20 * time.Second) queryAll := func() error { + // if a server doesn't return in 5 seconds, fail the response + timeout := time.After(5 * time.Second) ch := make(chan Response, 0) for _, s := range c.Servers { @@ -587,18 +588,17 @@ func (c *Cluster) QueryAll(q *Query) error { } tick := time.Tick(100 * time.Millisecond) + // if we don't reach consensus in 20 seconds, fail the query + timeout := time.After(20 * time.Second) + if err := queryAll(); err == nil { return nil - } else if err != timeoutErr { - return err } for { select { case <-tick: if err := queryAll(); err == nil { return nil - } else if err != timeoutErr { - return err } case <-timeout: return fmt.Errorf("timed out waiting for response") From 91da96eb34eca16148a06f3bdeac356e7136da0e Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 1 Dec 2015 09:55:16 -0600 Subject: [PATCH 09/15] fix race in test harness --- cmd/influxd/run/server_helpers_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index d8d40f7a91..3c1be880f5 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -130,11 +130,14 @@ func (s *Server) Query(query string) (results string, err error) { // Query executes a query against the server and returns the results. func (s *Server) QueryWithParams(query string, values url.Values) (results string, err error) { + var v url.Values if values == nil { - values = url.Values{} + v = url.Values{} + } else { + v, _ = url.ParseQuery(values.Encode()) } - values.Set("q", query) - return s.HTTPGet(s.URL() + "/query?" + values.Encode()) + v.Set("q", query) + return s.HTTPGet(s.URL() + "/query?" + v.Encode()) } // HTTPGet makes an HTTP GET request to the server and returns the response. @@ -589,7 +592,7 @@ func (c *Cluster) QueryAll(q *Query) error { tick := time.Tick(100 * time.Millisecond) // if we don't reach consensus in 20 seconds, fail the query - timeout := time.After(20 * time.Second) + timeout := time.After(10 * time.Second) if err := queryAll(); err == nil { return nil From 23435299cac6fa45b1b4840fc7c7fb2cf111db19 Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 1 Dec 2015 13:45:24 -0500 Subject: [PATCH 10/15] disable test parallelism --- circle-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/circle-test.sh b/circle-test.sh index f4ac8d7db2..04a4247000 100755 --- a/circle-test.sh +++ b/circle-test.sh @@ -6,7 +6,7 @@ BUILD_DIR=$HOME/influxdb-build GO_VERSION=go1.4.2 -PARALLELISM="-parallel 256" +PARALLELISM="-parallel 1" TIMEOUT="-timeout 480s" # Executes the given statement, and exits if the command returns a non-zero code. From ab57bde115b49aa4f6acdeb1e85ffed333c0aed5 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 1 Dec 2015 14:21:02 -0600 Subject: [PATCH 11/15] more tests, some refactoring --- cmd/influxd/run/server_cluster_test.go | 124 +++++++++++++++++++++- cmd/influxd/run/server_helpers_test.go | 2 +- cmd/influxd/run/server_suite_test.go | 106 +++++++++++++++++++ cmd/influxd/run/server_test.go | 139 ++++--------------------- 4 files changed, 252 insertions(+), 119 deletions(-) diff --git a/cmd/influxd/run/server_cluster_test.go b/cmd/influxd/run/server_cluster_test.go index b7443e5b59..02264e0529 100644 --- a/cmd/influxd/run/server_cluster_test.go +++ b/cmd/influxd/run/server_cluster_test.go @@ -79,7 +79,7 @@ func TestCluster_DatabaseCommands(t *testing.T) { func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) { t.Parallel() - c, err := NewClusterWithDefaults(5) + c, err := NewCluster(5) if err != nil { t.Fatalf("error creating cluster: %s", err) } @@ -87,6 +87,14 @@ func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) { test := tests.load(t, "drop_and_recreate_database") + s := c.Servers[0] + if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil { + t.Fatal(err) + } + _, err = c.Servers[0].Write(test.database(), test.retentionPolicy(), test.write, nil) if err != nil { t.Fatal(err) @@ -157,3 +165,117 @@ func TestCluster_Query_DropDatabaseIsolated(t *testing.T) { } } } + +func TestCluster_Query_DropAndRecreateSeries(t *testing.T) { + t.Parallel() + t.Skip() + c, err := NewCluster(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + test := tests.load(t, "drop_and_recreate_series") + + s := c.Servers[0] + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + _, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil) + if err != nil { + t.Fatal(err) + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Fatal(query.Error(err)) + } + } + + // Re-write data and test again. + retest := tests.load(t, "drop_and_recreate_series_retest") + + _, err = s.Write(retest.database(), retest.retentionPolicy(), retest.write, nil) + if err != nil { + t.Fatal(err) + } + for _, query := range retest.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } +} + +func TestCluster_Query_DropSeriesFromRegex(t *testing.T) { + t.Parallel() + t.Skip() + c, err := NewCluster(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + test := tests.load(t, "drop_series_from_regex") + + s := c.Servers[0] + if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil { + t.Fatal(err) + } + + _, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil) + if err != nil { + t.Fatal(err) + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } +} diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 3c1be880f5..bd8b3e75ff 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -592,7 +592,7 @@ func (c *Cluster) QueryAll(q *Query) error { tick := time.Tick(100 * time.Millisecond) // if we don't reach consensus in 20 seconds, fail the query - timeout := time.After(10 * time.Second) + timeout := time.After(20 * time.Second) if err := queryAll(); err == nil { return nil diff --git a/cmd/influxd/run/server_suite_test.go b/cmd/influxd/run/server_suite_test.go index be87dfaddf..f0146de44a 100644 --- a/cmd/influxd/run/server_suite_test.go +++ b/cmd/influxd/run/server_suite_test.go @@ -3,6 +3,7 @@ package run_test import ( "fmt" "net/url" + "strings" "testing" "time" ) @@ -129,6 +130,8 @@ func init() { } tests["drop_and_recreate_database"] = Test{ + db: "db0", + rp: "rp0", write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), queries: []*Query{ &Query{ @@ -202,6 +205,109 @@ func init() { }, } + tests["drop_and_recreate_series"] = Test{ + db: "db0", + rp: "rp0", + write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + queries: []*Query{ + &Query{ + name: "Show series is present", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop series after data write", + command: `DROP SERIES FROM cpu`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db0"}}, + once: true, + }, + &Query{ + name: "Show series is gone", + command: `SHOW SERIES`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }, + } + tests["drop_and_recreate_series_retest"] = Test{ + db: "db0", + rp: "rp0", + write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + queries: []*Query{ + &Query{ + name: "Show series is present again after re-write", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }, + } + + tests["drop_series_from_regex"] = Test{ + db: "db0", + rp: "rp0", + write: strings.Join([]string{ + fmt.Sprintf(`a,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`aa,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`b,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`c,host=serverA,region=uswest val=30.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + }, "\n"), + queries: []*Query{ + &Query{ + name: "Show series is present", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop series after data write", + command: `DROP SERIES FROM /a.*/`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db0"}}, + once: true, + }, + &Query{ + name: "Show series is gone", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop series from regex that matches no measurements", + command: `DROP SERIES FROM /a.*/`, + exp: `{"results":[{}]}`, + params: url.Values{"db": []string{"db0"}}, + once: true, + }, + &Query{ + name: "make sure DROP SERIES doesn't delete anything when regex doesn't match", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop series with WHERE field should error", + command: `DROP SERIES FROM c WHERE val > 50.0`, + exp: `{"results":[{"error":"DROP SERIES doesn't support fields in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "make sure DROP SERIES with field in WHERE didn't delete data", + command: `SHOW SERIES`, + exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "Drop series with WHERE time should error", + command: `DROP SERIES FROM c WHERE time > now() - 1d`, + exp: `{"results":[{"error":"DROP SERIES doesn't support time in WHERE clause"}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }, + } + } func (tests Tests) load(t *testing.T, key string) Test { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 90960097e6..a2344e8b92 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -51,15 +51,15 @@ func TestServer_Query_DropAndRecreateDatabase(t *testing.T) { s := OpenServer(NewConfig(), "") defer s.Close() - if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { - t.Fatal(err) - } - if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { - t.Fatal(err) - } - test := tests.load(t, "drop_and_recreate_database") + if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil { + t.Fatal(err) + } + for i, query := range test.queries { if i == 0 { if err := test.init(s); err != nil { @@ -83,18 +83,18 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) { s := OpenServer(NewConfig(), "") defer s.Close() - if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + test := tests.load(t, "drop_database_isolated") + + if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil { t.Fatal(err) } - if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil { t.Fatal(err) } if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp1", 1, 0)); err != nil { t.Fatal(err) } - test := tests.load(t, "drop_database_isolated") - for i, query := range test.queries { if i == 0 { if err := test.init(s); err != nil { @@ -118,41 +118,15 @@ func TestServer_Query_DropAndRecreateSeries(t *testing.T) { s := OpenServer(NewConfig(), "") defer s.Close() - if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + test := tests.load(t, "drop_and_recreate_series") + + if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil { t.Fatal(err) } - if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil { t.Fatal(err) } - writes := []string{ - fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - } - - test := NewTest("db0", "rp0") - test.write = strings.Join(writes, "\n") - - test.addQueries([]*Query{ - &Query{ - name: "Show series is present", - command: `SHOW SERIES`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Drop series after data write", - command: `DROP SERIES FROM cpu`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Show series is gone", - command: `SHOW SERIES`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - }...) - for i, query := range test.queries { if i == 0 { if err := test.init(s); err != nil { @@ -171,21 +145,11 @@ func TestServer_Query_DropAndRecreateSeries(t *testing.T) { } // Re-write data and test again. - reTest := NewTest("db0", "rp0") - reTest.write = strings.Join(writes, "\n") + retest := tests.load(t, "drop_and_recreate_series_retest") - reTest.addQueries([]*Query{ - &Query{ - name: "Show series is present again after re-write", - command: `SHOW SERIES`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - }...) - - for i, query := range reTest.queries { + for i, query := range retest.queries { if i == 0 { - if err := reTest.init(s); err != nil { + if err := retest.init(s); err != nil { t.Fatalf("test init failed: %s", err) } } @@ -206,74 +170,15 @@ func TestServer_Query_DropSeriesFromRegex(t *testing.T) { s := OpenServer(NewConfig(), "") defer s.Close() - if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + test := tests.load(t, "drop_series_from_regex") + + if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil { t.Fatal(err) } - if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil { t.Fatal(err) } - writes := []string{ - fmt.Sprintf(`a,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - fmt.Sprintf(`aa,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - fmt.Sprintf(`b,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - fmt.Sprintf(`c,host=serverA,region=uswest val=30.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), - } - - test := NewTest("db0", "rp0") - test.write = strings.Join(writes, "\n") - - test.addQueries([]*Query{ - &Query{ - name: "Show series is present", - command: `SHOW SERIES`, - exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Drop series after data write", - command: `DROP SERIES FROM /a.*/`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Show series is gone", - command: `SHOW SERIES`, - exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Drop series from regex that matches no measurements", - command: `DROP SERIES FROM /a.*/`, - exp: `{"results":[{}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "make sure DROP SERIES doesn't delete anything when regex doesn't match", - command: `SHOW SERIES`, - exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Drop series with WHERE field should error", - command: `DROP SERIES FROM c WHERE val > 50.0`, - exp: `{"results":[{"error":"DROP SERIES doesn't support fields in WHERE clause"}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "make sure DROP SERIES with field in WHERE didn't delete data", - command: `SHOW SERIES`, - exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - &Query{ - name: "Drop series with WHERE time should error", - command: `DROP SERIES FROM c WHERE time > now() - 1d`, - exp: `{"results":[{"error":"DROP SERIES doesn't support time in WHERE clause"}]}`, - params: url.Values{"db": []string{"db0"}}, - }, - }...) - for i, query := range test.queries { if i == 0 { if err := test.init(s); err != nil { From 96c230d968eb7e4d9d6524a48202098f7e50a43c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 1 Dec 2015 14:47:41 -0600 Subject: [PATCH 12/15] allow for configurable cluster creation --- cmd/influxd/run/server_helpers_test.go | 51 ++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index bd8b3e75ff..be00257351 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -448,13 +448,21 @@ func NewCluster(size int) (*Cluster, error) { configureLogging(s) } + if err := verifyCluster(&c, size); err != nil { + return nil, err + } + + return &c, nil +} + +func verifyCluster(c *Cluster, size int) error { r, err := c.Servers[0].Query("SHOW SERVERS") if err != nil { - return nil, err + return err } var cl client.Response if e := json.Unmarshal([]byte(r), &cl); e != nil { - return nil, e + return e } var leaderCount int @@ -465,10 +473,10 @@ func NewCluster(size int) (*Cluster, error) { for i, value := range series.Values { addr := c.Servers[i].MetaStore.Addr.String() if value[0].(float64) != float64(i+1) { - return nil, fmt.Errorf("expected nodeID %d, got %v", i, value[0]) + return fmt.Errorf("expected nodeID %d, got %v", i, value[0]) } if value[1].(string) != addr { - return nil, fmt.Errorf("expected addr %s, got %v", addr, value[1]) + return fmt.Errorf("expected addr %s, got %v", addr, value[1]) } if value[2].(bool) { raftCount++ @@ -480,15 +488,16 @@ func NewCluster(size int) (*Cluster, error) { } } if leaderCount != 1 { - return nil, fmt.Errorf("expected 1 leader, got %d", leaderCount) + return fmt.Errorf("expected 1 leader, got %d", leaderCount) } if size < 3 && raftCount != size { - return nil, fmt.Errorf("expected %d raft nodes, got %d", size, raftCount) + return fmt.Errorf("expected %d raft nodes, got %d", size, raftCount) } if size >= 3 && raftCount != 3 { - return nil, fmt.Errorf("expected 3 raft nodes, got %d", raftCount) + return fmt.Errorf("expected 3 raft nodes, got %d", raftCount) } - return &c, nil + + return nil } func NewClusterWithDefaults(size int) (*Cluster, error) { @@ -518,6 +527,32 @@ func NewClusterWithDefaults(size int) (*Cluster, error) { return c, nil } +func NewClusterCustom(size int, cb func(index int, config *run.Config)) (*Cluster, error) { + c := Cluster{} + + config := NewConfig() + cb(0, config) + + c.Servers = append(c.Servers, OpenServer(config, "")) + raftURL := c.Servers[0].MetaStore.Addr.String() + + for i := 1; i < size; i++ { + config := NewConfig() + cb(i, config) + c.Servers = append(c.Servers, OpenServer(config, raftURL)) + } + + for _, s := range c.Servers { + configureLogging(s) + } + + if err := verifyCluster(&c, size); err != nil { + return nil, err + } + + return &c, nil +} + // Close shuts down all servers. func (c *Cluster) Close() { var wg sync.WaitGroup From 52da1c76d6f2478900eaf2240f2db91ab1d6a73d Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 1 Dec 2015 14:48:00 -0600 Subject: [PATCH 13/15] cluster retention policy tests --- cmd/influxd/run/server_cluster_test.go | 43 +++++++++++++++ cmd/influxd/run/server_suite_test.go | 76 ++++++++++++++++++++++++++ cmd/influxd/run/server_test.go | 76 ++------------------------ 3 files changed, 123 insertions(+), 72 deletions(-) diff --git a/cmd/influxd/run/server_cluster_test.go b/cmd/influxd/run/server_cluster_test.go index 02264e0529..627192c704 100644 --- a/cmd/influxd/run/server_cluster_test.go +++ b/cmd/influxd/run/server_cluster_test.go @@ -5,6 +5,8 @@ import ( "strings" "testing" "time" + + "github.com/influxdb/influxdb/cmd/influxd/run" ) func TestCluster_CreateDatabase(t *testing.T) { @@ -279,3 +281,44 @@ func TestCluster_Query_DropSeriesFromRegex(t *testing.T) { } } } + +func TestCluster_RetentionPolicyCommands(t *testing.T) { + t.Parallel() + + configFunc := func(index int, config *run.Config) { + config.Meta.RetentionAutoCreate = false + } + + c, err := NewClusterCustom(5, configFunc) + + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + test := tests.load(t, "retention_policy_commands") + + s := c.Servers[0] + if _, err := s.MetaStore.CreateDatabase(test.database()); err != nil { + t.Fatal(err) + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } +} diff --git a/cmd/influxd/run/server_suite_test.go b/cmd/influxd/run/server_suite_test.go index f0146de44a..cba5abd17b 100644 --- a/cmd/influxd/run/server_suite_test.go +++ b/cmd/influxd/run/server_suite_test.go @@ -308,6 +308,82 @@ func init() { }, } + tests["retention_policy_commands"] = Test{ + db: "db0", + queries: []*Query{ + &Query{ + name: "create retention policy should succeed", + command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "create retention policy should error if it already exists", + command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{"error":"retention policy already exists"}]}`, + }, + &Query{ + name: "show retention policy should succeed", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","1h0m0s",1,false]]}]}]}`, + }, + &Query{ + name: "alter retention policy should succeed", + command: `ALTER RETENTION POLICY rp0 ON db0 DURATION 2h REPLICATION 3 DEFAULT`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "show retention policy should have new altered information", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, + }, + &Query{ + name: "dropping default retention policy should not succeed", + command: `DROP RETENTION POLICY rp0 ON db0`, + exp: `{"results":[{"error":"retention policy is default"}]}`, + }, + &Query{ + name: "show retention policy should still show policy", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, + }, + &Query{ + name: "create a second non-default retention policy", + command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "show retention policy should show both", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true],["rp2","1h0m0s",1,false]]}]}]}`, + }, + &Query{ + name: "dropping non-default retention policy succeed", + command: `DROP RETENTION POLICY rp2 ON db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "show retention policy should show just default", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, + }, + &Query{ + name: "Ensure retention policy with unacceptable retention cannot be created", + command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1s REPLICATION 1`, + exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`, + once: true, + }, + &Query{ + name: "Check error when deleting retention policy on non-existent database", + command: `DROP RETENTION POLICY rp1 ON mydatabase`, + exp: `{"results":[{"error":"database not found: mydatabase"}]}`, + }, + }, + } + } func (tests Tests) load(t *testing.T, key string) Test { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index a2344e8b92..790fae85b5 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -205,79 +205,11 @@ func TestServer_RetentionPolicyCommands(t *testing.T) { s := OpenServer(c, "") defer s.Close() - // Create a database. - if _, err := s.MetaStore.CreateDatabase("db0"); err != nil { - t.Fatal(err) - } + test := tests.load(t, "retention_policy_commands") - test := Test{ - queries: []*Query{ - &Query{ - name: "create retention policy should succeed", - command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "create retention policy should error if it already exists", - command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`, - exp: `{"results":[{"error":"retention policy already exists"}]}`, - }, - &Query{ - name: "show retention policy should succeed", - command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","1h0m0s",1,false]]}]}]}`, - }, - &Query{ - name: "alter retention policy should succeed", - command: `ALTER RETENTION POLICY rp0 ON db0 DURATION 2h REPLICATION 3 DEFAULT`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show retention policy should have new altered information", - command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, - }, - &Query{ - name: "dropping default retention policy should not succeed", - command: `DROP RETENTION POLICY rp0 ON db0`, - exp: `{"results":[{"error":"retention policy is default"}]}`, - }, - &Query{ - name: "show retention policy should still show policy", - command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, - }, - &Query{ - name: "create a second non-default retention policy", - command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show retention policy should show both", - command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true],["rp2","1h0m0s",1,false]]}]}]}`, - }, - &Query{ - name: "dropping non-default retention policy succeed", - command: `DROP RETENTION POLICY rp2 ON db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show retention policy should show just default", - command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, - }, - &Query{ - name: "Ensure retention policy with unacceptable retention cannot be created", - command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1s REPLICATION 1`, - exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`, - }, - &Query{ - name: "Check error when deleting retention policy on non-existent database", - command: `DROP RETENTION POLICY rp1 ON mydatabase`, - exp: `{"results":[{"error":"database not found: mydatabase"}]}`, - }, - }, + // Create a database. + if _, err := s.MetaStore.CreateDatabase(test.database()); err != nil { + t.Fatal(err) } for _, query := range test.queries { From 0c2588ed43c981d0b66c1860c83540a2d114e055 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 1 Dec 2015 15:16:05 -0600 Subject: [PATCH 14/15] cluster retention auto create test --- cmd/influxd/run/server_cluster_test.go | 31 ++++++++++++++++++++++++++ cmd/influxd/run/server_suite_test.go | 16 +++++++++++++ cmd/influxd/run/server_test.go | 15 +------------ 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/cmd/influxd/run/server_cluster_test.go b/cmd/influxd/run/server_cluster_test.go index 627192c704..9794b0c651 100644 --- a/cmd/influxd/run/server_cluster_test.go +++ b/cmd/influxd/run/server_cluster_test.go @@ -322,3 +322,34 @@ func TestCluster_RetentionPolicyCommands(t *testing.T) { } } } + +func TestCluster_DatabaseRetentionPolicyAutoCreate(t *testing.T) { + t.Parallel() + t.Skip() + c, err := NewCluster(5) + if err != nil { + t.Fatalf("error creating cluster: %s", err) + } + defer c.Close() + + test := tests.load(t, "retention_policy_auto_create") + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + t.Logf("Running %s", query.name) + if query.once { + if _, err := c.Query(query); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + continue + } + if err := c.QueryAll(query); err != nil { + t.Error(query.Error(err)) + } + } +} diff --git a/cmd/influxd/run/server_suite_test.go b/cmd/influxd/run/server_suite_test.go index cba5abd17b..6aebe40470 100644 --- a/cmd/influxd/run/server_suite_test.go +++ b/cmd/influxd/run/server_suite_test.go @@ -384,6 +384,22 @@ func init() { }, } + tests["retention_policy_auto_create"] = Test{ + queries: []*Query{ + &Query{ + name: "create database should succeed", + command: `CREATE DATABASE db0`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "show retention policies should return auto-created policy", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`, + }, + }, + } + } func (tests Tests) load(t *testing.T, key string) Test { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 790fae85b5..23695ada0b 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -231,20 +231,7 @@ func TestServer_DatabaseRetentionPolicyAutoCreate(t *testing.T) { s := OpenServer(NewConfig(), "") defer s.Close() - test := Test{ - queries: []*Query{ - &Query{ - name: "create database should succeed", - command: `CREATE DATABASE db0`, - exp: `{"results":[{}]}`, - }, - &Query{ - name: "show retention policies should return auto-created policy", - command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`, - }, - }, - } + test := tests.load(t, "retention_policy_auto_create") for _, query := range test.queries { if query.skip { From edf8e31ee604e6142cd5ac05e83ffb3ab7ad6ba1 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 3 Dec 2015 07:59:31 -0600 Subject: [PATCH 15/15] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a42e3e460..96abded174 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#4841](https://github.com/influxdb/influxdb/pull/4841): Improve point parsing speed. Lint models pacakge. Thanks @e-dard! - [#4889](https://github.com/influxdb/influxdb/pull/4889): Implement close notifier and timeout on executors - [#2676](https://github.com/influxdb/influxdb/issues/2676), [#4866](https://github.com/influxdb/influxdb/pull/4866): Add support for specifying default retention policy in database create. Thanks @pires! +- [#4848](https://github.com/influxdb/influxdb/pull/4848): Added framework for cluster integration testing. ### Bugfixes - [#4876](https://github.com/influxdb/influxdb/pull/4876): Complete lint for monitor and services packages. Thanks @e-dard!