Merge pull request #5590 from influxdata/er-sub-panic

Fixes #5545
pull/5613/head
Edd Robinson 2016-02-10 10:46:22 +00:00
commit bb9eed941e
4 changed files with 122 additions and 29 deletions

View File

@ -14,6 +14,7 @@
- [#5594](https://github.com/influxdata/influxdb/pull/5594): Fix missing url params on lease redirect - @oldmantaiter
- [#5376](https://github.com/influxdata/influxdb/pull/5376): Fix golint issues in models package. @nuss-justin
- [#5535](https://github.com/influxdata/influxdb/pull/5535): Update README for referring to Collectd
- [#5590](https://github.com/influxdata/influxdb/pull/5590): Fix panic when dropping subscription for unknown retention policy.
## v0.10.0 [2016-02-04]

View File

@ -89,7 +89,7 @@ func (data *Data) setDataNode(nodeID uint64, host, tcpHost string) error {
return nil
}
// DeleteNode removes a node from the metadata.
// DeleteDataNode removes a node from the metadata.
func (data *Data) DeleteDataNode(id uint64) error {
// Node has to be larger than 0 to be real
if id == 0 {
@ -574,8 +574,7 @@ func (data *Data) CreateSubscription(database, rp, name, mode string, destinatio
rpi, err := data.RetentionPolicy(database, rp)
if err != nil {
return err
}
if rpi == nil {
} else if rpi == nil {
return influxdb.ErrRetentionPolicyNotFound(rp)
}
@ -601,6 +600,8 @@ func (data *Data) DropSubscription(database, rp, name string) error {
rpi, err := data.RetentionPolicy(database, rp)
if err != nil {
return err
} else if rpi == nil {
return influxdb.ErrRetentionPolicyNotFound(rp)
}
for i := range rpi.Subscriptions {

View File

@ -69,9 +69,6 @@ var (
// ErrReplicationFactorTooLow is returned when the replication factor is not in an
// acceptable range.
ErrReplicationFactorTooLow = errors.New("replication factor must be greater than 0")
// ErrRetentionPolicyNotFound is returned when an expected retention policy can't be found.
ErrRetentionPolicyNotFound = errors.New("retention policy not found")
)
var (

View File

@ -13,6 +13,8 @@ import (
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tcp"
@ -566,7 +568,7 @@ func TestMetaService_ContinuousQueries(t *testing.T) {
}
}
func TestMetaService_Subscriptions(t *testing.T) {
func TestMetaService_Subscriptions_Create(t *testing.T) {
t.Parallel()
d, s, c := newServiceAndClient()
@ -598,36 +600,128 @@ func TestMetaService_Subscriptions(t *testing.T) {
res := c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`))
if res.Err != nil {
t.Fatal(res.Err)
} else if got, exp := len(res.Series), 1; got != exp {
t.Fatalf("unexpected response.\n\ngot: %d series\nexp: %d\n", got, exp)
}
// Create another subscription.
if res := c.ExecuteStatement(mustParseStatement(`CREATE SUBSCRIPTION sub1 ON db0."default" DESTINATIONS ALL 'udp://example.com:6060'`)); res.Err != nil {
t.Fatal(res.Err)
}
// The subscriptions are correctly created.
if res = c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`)); res.Err != nil {
t.Fatal(res.Err)
}
exp := `{"series":[{"name":"db0","columns":["retention_policy","name","mode","destinations"],"values":[["default","sub0","ALL",["udp://example.com:9090"]],["default","sub1","ALL",["udp://example.com:6060"]]]}]}`
got := mustMarshalJSON(res)
if got != exp {
t.Fatalf("unexpected response.\n\ngot: %s\nexp: %s\n", exp, got)
}
}
func TestMetaService_Subscriptions_Show(t *testing.T) {
t.Parallel()
d, s, c := newServiceAndClient()
defer os.RemoveAll(d)
defer s.Close()
defer c.Close()
// Create a database to use
if res := c.ExecuteStatement(mustParseStatement("CREATE DATABASE db0")); res.Err != nil {
t.Fatal(res.Err)
}
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
// SHOW SUBSCRIPTIONS returns no subscriptions when there are none.
res := c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`))
if res.Err != nil {
t.Fatal(res.Err)
} else if got, exp := len(res.Series), 0; got != exp {
t.Fatalf("got %d series, expected %d", got, exp)
}
// Create a subscription.
if res = c.ExecuteStatement(mustParseStatement(`CREATE SUBSCRIPTION sub0 ON db0."default" DESTINATIONS ALL 'udp://example.com:9090'`)); res.Err != nil {
t.Fatal(res.Err)
}
// SHOW SUBSCRIPTIONS returns the created subscription.
if res = c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`)); res.Err != nil {
t.Fatal(res.Err)
} else if got, exp := len(res.Series), 1; got != exp {
t.Fatalf("got %d series, expected %d", got, exp)
}
exp := `{"series":[{"name":"db0","columns":["retention_policy","name","mode","destinations"],"values":[["default","sub0","ALL",["udp://example.com:9090"]]]}]}`
got := mustMarshalJSON(res)
if exp != got {
t.Fatalf("unexpected response.\n\nexp: %s\ngot: %s\n", exp, got)
if got != exp {
t.Fatalf("unexpected response.\n\ngot: %s\nexp: %s\n", got, exp)
}
}
// Create a couple more subscriptions
if res := c.ExecuteStatement(mustParseStatement(`CREATE SUBSCRIPTION sub1 ON db0."default" DESTINATIONS ALL 'udp://example.com:6060'`)); res.Err != nil {
func TestMetaService_Subscriptions_Drop(t *testing.T) {
t.Parallel()
d, s, c := newServiceAndClient()
defer os.RemoveAll(d)
defer s.Close()
defer c.Close()
// Create a database to use
if res := c.ExecuteStatement(mustParseStatement("CREATE DATABASE db0")); res.Err != nil {
t.Fatal(res.Err)
}
if res := c.ExecuteStatement(mustParseStatement(`CREATE SUBSCRIPTION sub2 ON db0."default" DESTINATIONS ALL 'udp://example.com:7070'`)); res.Err != nil {
db, err := c.Database("db0")
if err != nil {
t.Fatal(err)
} else if db.Name != "db0" {
t.Fatalf("db name wrong: %s", db.Name)
}
// DROP SUBSCRIPTION returns ErrSubscriptionNotFound when the
// subscription is unknown.
res := c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION foo ON db0."default"`))
if got, exp := res.Err, meta.ErrSubscriptionNotFound; got.Error() != exp.Error() {
t.Fatalf("got: %s, exp: %s", got.Error(), exp)
}
// Create a subscription.
if res = c.ExecuteStatement(mustParseStatement(`CREATE SUBSCRIPTION sub0 ON db0."default" DESTINATIONS ALL 'udp://example.com:9090'`)); res.Err != nil {
t.Fatal(res.Err)
}
// Re-create a subscription
if res := c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub1 ON db0."default"`)); res.Err != nil {
t.Fatal(res.Err)
// DROP SUBSCRIPTION returns an influxdb.ErrDatabaseNotFound when
// the database is unknown.
res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON foo."default"`))
if got, exp := res.Err, influxdb.ErrDatabaseNotFound("foo"); got.Error() != exp.Error() {
t.Fatalf("got: %s, exp: %s", got.Error(), exp)
}
res = c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`))
if res.Err != nil {
t.Fatal(res.Err)
// DROP SUBSCRIPTION returns an influxdb.ErrRetentionPolicyNotFound
// when the retention policy is unknown.
res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON db0."foo_policy"`))
if got, exp := res.Err, influxdb.ErrRetentionPolicyNotFound("foo_policy"); got.Error() != exp.Error() {
t.Fatalf("got: %s, exp: %s", got.Error(), exp)
}
exp = `{"series":[{"name":"db0","columns":["retention_policy","name","mode","destinations"],"values":[["default","sub0","ALL",["udp://example.com:9090"]],["default","sub2","ALL",["udp://example.com:7070"]]]}]}`
got = mustMarshalJSON(res)
if exp != got {
t.Fatalf("unexpected response.\n\nexp: %s\ngot: %s\n", exp, got)
// DROP SUBSCRIPTION drops the subsciption if it can find it.
res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON db0."default"`))
if got := res.Err; got != nil {
t.Fatalf("got: %s, exp: %v", got.Error(), nil)
}
if res = c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`)); res.Err != nil {
t.Fatal(res.Err)
} else if got, exp := len(res.Series), 0; got != exp {
t.Fatalf("got %d series, expected %d", got, exp)
}
}
@ -778,7 +872,7 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) {
cfgs := make([]*meta.Config, 3)
srvs := make([]*testService, 3)
for i, _ := range cfgs {
for i := range cfgs {
c := newConfig()
cfgs[i] = c
@ -821,7 +915,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) {
cfgs := make([]*meta.Config, 3)
srvs := make([]*testService, 3)
for i, _ := range cfgs {
for i := range cfgs {
c := newConfig()
cfgs[i] = c
@ -1103,18 +1197,18 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) {
}
defer c.Close()
id_after := c.ClusterID()
if id_after == 0 {
idAfter := c.ClusterID()
if idAfter == 0 {
t.Fatal("cluster ID can't be zero")
} else if id_after != id {
t.Fatalf("cluster id not the same: %d, %d", id_after, id)
} else if idAfter != id {
t.Fatalf("cluster id not the same: %d, %d", idAfter, id)
}
}
func TestMetaService_Ping(t *testing.T) {
cfgs := make([]*meta.Config, 3)
srvs := make([]*testService, 3)
for i, _ := range cfgs {
for i := range cfgs {
c := newConfig()
cfgs[i] = c