Merge pull request #4848 from influxdb/cluster-integration
Added framework for cluster integration testingpull/4905/merge
commit
67ea0b757b
|
@ -6,6 +6,7 @@
|
|||
- [#4841](https://github.com/influxdb/influxdb/pull/4841): Improve point parsing speed. Lint models pacakge. Thanks @e-dard!
|
||||
- [#4889](https://github.com/influxdb/influxdb/pull/4889): Implement close notifier and timeout on executors
|
||||
- [#2676](https://github.com/influxdb/influxdb/issues/2676), [#4866](https://github.com/influxdb/influxdb/pull/4866): Add support for specifying default retention policy in database create. Thanks @pires!
|
||||
- [#4848](https://github.com/influxdb/influxdb/pull/4848): Added framework for cluster integration testing.
|
||||
|
||||
### Bugfixes
|
||||
- [#4876](https://github.com/influxdb/influxdb/pull/4876): Complete lint for monitor and services packages. Thanks @e-dard!
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
BUILD_DIR=$HOME/influxdb-build
|
||||
GO_VERSION=go1.4.2
|
||||
PARALLELISM="-parallel 256"
|
||||
PARALLELISM="-parallel 1"
|
||||
TIMEOUT="-timeout 480s"
|
||||
|
||||
# Executes the given statement, and exits if the command returns a non-zero code.
|
||||
|
|
|
@ -363,6 +363,7 @@ func (s *Server) Open() error {
|
|||
// The port 0 is used, we need to retrieve the port assigned by the kernel
|
||||
if strings.HasSuffix(s.BindAddress, ":0") {
|
||||
s.MetaStore.Addr = ln.Addr()
|
||||
s.MetaStore.RemoteAddr = ln.Addr()
|
||||
}
|
||||
|
||||
// Multiplex listener.
|
||||
|
|
|
@ -0,0 +1,355 @@
|
|||
package run_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cmd/influxd/run"
|
||||
)
|
||||
|
||||
func TestCluster_CreateDatabase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c, err := NewClusterWithDefaults(5)
|
||||
defer c.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_Write(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c, err := NewClusterWithDefaults(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
}
|
||||
|
||||
_, err = c.Servers[0].Write("db0", "default", strings.Join(writes, "\n"), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
q := &Query{
|
||||
name: "write",
|
||||
command: `SELECT * FROM db0."default".cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
}
|
||||
err = c.QueryAll(q)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_DatabaseCommands(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, err := NewCluster(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "database_commands")
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, err := NewCluster(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "drop_and_recreate_database")
|
||||
|
||||
s := c.Servers[0]
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = c.Servers[0].Write(test.database(), test.retentionPolicy(), test.write, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_Query_DropDatabaseIsolated(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, err := NewCluster(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "drop_database_isolated")
|
||||
|
||||
s := c.Servers[0]
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp1", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_Query_DropAndRecreateSeries(t *testing.T) {
|
||||
t.Parallel()
|
||||
t.Skip()
|
||||
c, err := NewCluster(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "drop_and_recreate_series")
|
||||
|
||||
s := c.Servers[0]
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Fatal(query.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Re-write data and test again.
|
||||
retest := tests.load(t, "drop_and_recreate_series_retest")
|
||||
|
||||
_, err = s.Write(retest.database(), retest.retentionPolicy(), retest.write, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, query := range retest.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_Query_DropSeriesFromRegex(t *testing.T) {
|
||||
t.Parallel()
|
||||
t.Skip()
|
||||
c, err := NewCluster(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "drop_series_from_regex")
|
||||
|
||||
s := c.Servers[0]
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_RetentionPolicyCommands(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
configFunc := func(index int, config *run.Config) {
|
||||
config.Meta.RetentionAutoCreate = false
|
||||
}
|
||||
|
||||
c, err := NewClusterCustom(5, configFunc)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "retention_policy_commands")
|
||||
|
||||
s := c.Servers[0]
|
||||
if _, err := s.MetaStore.CreateDatabase(test.database()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCluster_DatabaseRetentionPolicyAutoCreate(t *testing.T) {
|
||||
t.Parallel()
|
||||
t.Skip()
|
||||
c, err := NewCluster(5)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating cluster: %s", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
test := tests.load(t, "retention_policy_auto_create")
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
t.Logf("Running %s", query.name)
|
||||
if query.once {
|
||||
if _, err := c.Query(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := c.QueryAll(query); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,15 +14,19 @@ import (
|
|||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/client/v2"
|
||||
"github.com/influxdb/influxdb/cmd/influxd/run"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/services/httpd"
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
)
|
||||
|
||||
const emptyResults = `{"results":[{}]}`
|
||||
|
||||
// Server represents a test wrapper for run.Server.
|
||||
type Server struct {
|
||||
*run.Server
|
||||
|
@ -48,6 +52,9 @@ func NewServer(c *run.Config) *Server {
|
|||
|
||||
// OpenServer opens a test server.
|
||||
func OpenServer(c *run.Config, joinURLs string) *Server {
|
||||
if len(joinURLs) > 0 {
|
||||
c.Meta.Peers = strings.Split(joinURLs, ",")
|
||||
}
|
||||
s := NewServer(c)
|
||||
configureLogging(s)
|
||||
if err := s.Open(); err != nil {
|
||||
|
@ -123,11 +130,14 @@ func (s *Server) Query(query string) (results string, err error) {
|
|||
|
||||
// Query executes a query against the server and returns the results.
|
||||
func (s *Server) QueryWithParams(query string, values url.Values) (results string, err error) {
|
||||
var v url.Values
|
||||
if values == nil {
|
||||
values = url.Values{}
|
||||
v = url.Values{}
|
||||
} else {
|
||||
v, _ = url.ParseQuery(values.Encode())
|
||||
}
|
||||
values.Set("q", query)
|
||||
return s.HTTPGet(s.URL() + "/query?" + values.Encode())
|
||||
v.Set("q", query)
|
||||
return s.HTTPGet(s.URL() + "/query?" + v.Encode())
|
||||
}
|
||||
|
||||
// HTTPGet makes an HTTP GET request to the server and returns the response.
|
||||
|
@ -294,6 +304,7 @@ type Query struct {
|
|||
pattern bool
|
||||
skip bool
|
||||
repeat int
|
||||
once bool
|
||||
}
|
||||
|
||||
// Execute runs the command and returns an err if it fails
|
||||
|
@ -321,6 +332,8 @@ func (q *Query) failureMessage() string {
|
|||
return fmt.Sprintf("%s: unexpected results\nquery: %s\nexp: %s\nactual: %s\n", q.name, q.command, q.exp, q.act)
|
||||
}
|
||||
|
||||
type Tests map[string]Test
|
||||
|
||||
type Test struct {
|
||||
initialized bool
|
||||
write string
|
||||
|
@ -338,15 +351,57 @@ func NewTest(db, rp string) Test {
|
|||
}
|
||||
}
|
||||
|
||||
func (t Test) duplicate() Test {
|
||||
test := Test{
|
||||
initialized: t.initialized,
|
||||
write: t.write,
|
||||
params: t.params,
|
||||
db: t.db,
|
||||
rp: t.rp,
|
||||
exp: t.exp,
|
||||
queries: make([]*Query, len(t.queries)),
|
||||
}
|
||||
copy(test.queries, t.queries)
|
||||
return test
|
||||
}
|
||||
|
||||
func (t *Test) addWrite(s ...string) {
|
||||
if len(t.write) > 0 {
|
||||
t.write += "\n"
|
||||
}
|
||||
t.write = strings.Join(s, "\n")
|
||||
}
|
||||
|
||||
func (t *Test) addQueries(q ...*Query) {
|
||||
t.queries = append(t.queries, q...)
|
||||
}
|
||||
|
||||
func (t *Test) database() string {
|
||||
if t.db != "" {
|
||||
return t.db
|
||||
}
|
||||
return "db0"
|
||||
}
|
||||
|
||||
func (t *Test) retentionPolicy() string {
|
||||
if t.rp != "" {
|
||||
return t.rp
|
||||
}
|
||||
return "default"
|
||||
}
|
||||
|
||||
func (t *Test) init(s *Server) error {
|
||||
if t.write == "" || t.initialized {
|
||||
return nil
|
||||
}
|
||||
t.initialized = true
|
||||
if t.db == "" {
|
||||
t.db = "db0"
|
||||
}
|
||||
if t.rp == "" {
|
||||
t.rp = "rp0"
|
||||
}
|
||||
|
||||
if res, err := s.Write(t.db, t.rp, t.write, t.params); err != nil {
|
||||
return err
|
||||
} else if t.exp != res {
|
||||
|
@ -362,7 +417,7 @@ func configureLogging(s *Server) {
|
|||
SetLogger(*log.Logger)
|
||||
}
|
||||
nullLogger := log.New(ioutil.Discard, "", 0)
|
||||
s.MetaStore.Logger = nullLogger
|
||||
s.MetaStore.SetLogger(nullLogger)
|
||||
s.TSDBStore.Logger = nullLogger
|
||||
s.HintedHandoff.SetLogger(nullLogger)
|
||||
s.Monitor.SetLogger(nullLogger)
|
||||
|
@ -375,3 +430,216 @@ func configureLogging(s *Server) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
Servers []*Server
|
||||
}
|
||||
|
||||
func NewCluster(size int) (*Cluster, error) {
|
||||
c := Cluster{}
|
||||
c.Servers = append(c.Servers, OpenServer(NewConfig(), ""))
|
||||
raftURL := c.Servers[0].MetaStore.Addr.String()
|
||||
|
||||
for i := 1; i < size; i++ {
|
||||
c.Servers = append(c.Servers, OpenServer(NewConfig(), raftURL))
|
||||
}
|
||||
|
||||
for _, s := range c.Servers {
|
||||
configureLogging(s)
|
||||
}
|
||||
|
||||
if err := verifyCluster(&c, size); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func verifyCluster(c *Cluster, size int) error {
|
||||
r, err := c.Servers[0].Query("SHOW SERVERS")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var cl client.Response
|
||||
if e := json.Unmarshal([]byte(r), &cl); e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
var leaderCount int
|
||||
var raftCount int
|
||||
|
||||
for _, result := range cl.Results {
|
||||
for _, series := range result.Series {
|
||||
for i, value := range series.Values {
|
||||
addr := c.Servers[i].MetaStore.Addr.String()
|
||||
if value[0].(float64) != float64(i+1) {
|
||||
return fmt.Errorf("expected nodeID %d, got %v", i, value[0])
|
||||
}
|
||||
if value[1].(string) != addr {
|
||||
return fmt.Errorf("expected addr %s, got %v", addr, value[1])
|
||||
}
|
||||
if value[2].(bool) {
|
||||
raftCount++
|
||||
}
|
||||
if value[3].(bool) {
|
||||
leaderCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if leaderCount != 1 {
|
||||
return fmt.Errorf("expected 1 leader, got %d", leaderCount)
|
||||
}
|
||||
if size < 3 && raftCount != size {
|
||||
return fmt.Errorf("expected %d raft nodes, got %d", size, raftCount)
|
||||
}
|
||||
if size >= 3 && raftCount != 3 {
|
||||
return fmt.Errorf("expected 3 raft nodes, got %d", raftCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewClusterWithDefaults(size int) (*Cluster, error) {
|
||||
c, err := NewCluster(size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r, err := c.Query(&Query{command: "CREATE DATABASE db0"})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r != emptyResults {
|
||||
return nil, fmt.Errorf("%s", r)
|
||||
}
|
||||
|
||||
for i, s := range c.Servers {
|
||||
got, err := s.Query("SHOW DATABASES")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query databases on node %d for show databases", i+1)
|
||||
}
|
||||
if exp := `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`; got != exp {
|
||||
return nil, fmt.Errorf("unexpected result node %d\nexp: %s\ngot: %s\n", i+1, exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func NewClusterCustom(size int, cb func(index int, config *run.Config)) (*Cluster, error) {
|
||||
c := Cluster{}
|
||||
|
||||
config := NewConfig()
|
||||
cb(0, config)
|
||||
|
||||
c.Servers = append(c.Servers, OpenServer(config, ""))
|
||||
raftURL := c.Servers[0].MetaStore.Addr.String()
|
||||
|
||||
for i := 1; i < size; i++ {
|
||||
config := NewConfig()
|
||||
cb(i, config)
|
||||
c.Servers = append(c.Servers, OpenServer(config, raftURL))
|
||||
}
|
||||
|
||||
for _, s := range c.Servers {
|
||||
configureLogging(s)
|
||||
}
|
||||
|
||||
if err := verifyCluster(&c, size); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
// Close shuts down all servers.
|
||||
func (c *Cluster) Close() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(c.Servers))
|
||||
|
||||
for _, s := range c.Servers {
|
||||
go func(s *Server) {
|
||||
defer wg.Done()
|
||||
s.Close()
|
||||
}(s)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (c *Cluster) Query(q *Query) (string, error) {
|
||||
r, e := c.Servers[0].Query(q.command)
|
||||
q.act = r
|
||||
return r, e
|
||||
}
|
||||
|
||||
func (c *Cluster) QueryIndex(index int, q string) (string, error) {
|
||||
return c.Servers[index].Query(q)
|
||||
}
|
||||
|
||||
func (c *Cluster) QueryAll(q *Query) error {
|
||||
type Response struct {
|
||||
Val string
|
||||
Err error
|
||||
}
|
||||
|
||||
timeoutErr := fmt.Errorf("timed out waiting for response")
|
||||
|
||||
queryAll := func() error {
|
||||
// if a server doesn't return in 5 seconds, fail the response
|
||||
timeout := time.After(5 * time.Second)
|
||||
ch := make(chan Response, 0)
|
||||
|
||||
for _, s := range c.Servers {
|
||||
go func(s *Server) {
|
||||
r, err := s.QueryWithParams(q.command, q.params)
|
||||
ch <- Response{Val: r, Err: err}
|
||||
}(s)
|
||||
}
|
||||
|
||||
resps := []Response{}
|
||||
for i := 0; i < len(c.Servers); i++ {
|
||||
select {
|
||||
case r := <-ch:
|
||||
resps = append(resps, r)
|
||||
case <-timeout:
|
||||
return timeoutErr
|
||||
}
|
||||
}
|
||||
|
||||
for _, r := range resps {
|
||||
if r.Err != nil {
|
||||
return r.Err
|
||||
}
|
||||
if q.pattern {
|
||||
if !expectPattern(q.exp, r.Val) {
|
||||
return fmt.Errorf("unexpected pattern: \n\texp: %s\n\tgot: %s\n", q.exp, r.Val)
|
||||
}
|
||||
} else {
|
||||
if r.Val != q.exp {
|
||||
return fmt.Errorf("unexpected value:\n\texp: %s\n\tgot: %s\n", q.exp, r.Val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
tick := time.Tick(100 * time.Millisecond)
|
||||
// if we don't reach consensus in 20 seconds, fail the query
|
||||
timeout := time.After(20 * time.Second)
|
||||
|
||||
if err := queryAll(); err == nil {
|
||||
return nil
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
if err := queryAll(); err == nil {
|
||||
return nil
|
||||
}
|
||||
case <-timeout:
|
||||
return fmt.Errorf("timed out waiting for response")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,412 @@
|
|||
package run_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var tests Tests
|
||||
|
||||
// Load all shared tests
|
||||
func init() {
|
||||
tests = make(map[string]Test)
|
||||
|
||||
tests["database_commands"] = Test{
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "create database should succeed",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should succeed",
|
||||
command: `CREATE DATABASE db0_r WITH DURATION 24h REPLICATION 2 NAME db0_r_policy`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error with bad name",
|
||||
command: `CREATE DATABASE 0xdb0`,
|
||||
exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should error with bad retention duration",
|
||||
command: `CREATE DATABASE db0 WITH DURATION xyz`,
|
||||
exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 35"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention replication should error with bad retention replication number",
|
||||
command: `CREATE DATABASE db0 WITH REPLICATION xyz`,
|
||||
exp: `{"error":"error parsing query: found xyz, expected number at line 1, char 38"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention name should error with missing retention name",
|
||||
command: `CREATE DATABASE db0 WITH NAME`,
|
||||
exp: `{"error":"error parsing query: found EOF, expected identifier at line 1, char 31"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error if it already exists",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{"error":"database already exists"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error if it already exists",
|
||||
command: `CREATE DATABASE db0_r`,
|
||||
exp: `{"results":[{"error":"database already exists"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should not error with existing database with IF NOT EXISTS",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should create non-existing database with IF NOT EXISTS",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should not error with existing database with IF NOT EXISTS",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION 24h`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error IF NOT EXISTS with bad retention duration",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION xyz`,
|
||||
exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 49"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db0 should succeed",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db0_r should succeed",
|
||||
command: `DROP DATABASE db0_r`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db1 should succeed",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should error if it does not exists",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{"error":"database not found: db1"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should not error with non-existing database db1 WITH IF EXISTS",
|
||||
command: `DROP DATABASE IF EXISTS db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should have no results",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should error if it doesn't exist",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{"error":"database not found: db0"}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["drop_and_recreate_database"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "Drop database after data write",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Recreate database",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Recreate retention policy",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 365d REPLICATION 1 DEFAULT`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show measurements after recreate",
|
||||
command: `SHOW MEASUREMENTS`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data after recreate",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["drop_database_isolated"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "Query data from 1st database",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database with GROUP BY *",
|
||||
command: `SELECT * FROM cpu GROUP BY *`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop other database",
|
||||
command: `DROP DATABASE db1`,
|
||||
once: true,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database and ensure it's still there",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database and ensure it's still there with GROUP BY *",
|
||||
command: `SELECT * FROM cpu GROUP BY *`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["drop_and_recreate_series"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series after data write",
|
||||
command: `DROP SERIES FROM cpu`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
tests["drop_and_recreate_series_retest"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
write: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "Show series is present again after re-write",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["drop_series_from_regex"] = Test{
|
||||
db: "db0",
|
||||
rp: "rp0",
|
||||
write: strings.Join([]string{
|
||||
fmt.Sprintf(`a,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`aa,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`b,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`c,host=serverA,region=uswest val=30.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
}, "\n"),
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series after data write",
|
||||
command: `DROP SERIES FROM /a.*/`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series from regex that matches no measurements",
|
||||
command: `DROP SERIES FROM /a.*/`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "make sure DROP SERIES doesn't delete anything when regex doesn't match",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series with WHERE field should error",
|
||||
command: `DROP SERIES FROM c WHERE val > 50.0`,
|
||||
exp: `{"results":[{"error":"DROP SERIES doesn't support fields in WHERE clause"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "make sure DROP SERIES with field in WHERE didn't delete data",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series with WHERE time should error",
|
||||
command: `DROP SERIES FROM c WHERE time > now() - 1d`,
|
||||
exp: `{"results":[{"error":"DROP SERIES doesn't support time in WHERE clause"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["retention_policy_commands"] = Test{
|
||||
db: "db0",
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "create retention policy should succeed",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create retention policy should error if it already exists",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"retention policy already exists"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should succeed",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "alter retention policy should succeed",
|
||||
command: `ALTER RETENTION POLICY rp0 ON db0 DURATION 2h REPLICATION 3 DEFAULT`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should have new altered information",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping default retention policy should not succeed",
|
||||
command: `DROP RETENTION POLICY rp0 ON db0`,
|
||||
exp: `{"results":[{"error":"retention policy is default"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should still show policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create a second non-default retention policy",
|
||||
command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show both",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true],["rp2","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping non-default retention policy succeed",
|
||||
command: `DROP RETENTION POLICY rp2 ON db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show just default",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Ensure retention policy with unacceptable retention cannot be created",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1s REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Check error when deleting retention policy on non-existent database",
|
||||
command: `DROP RETENTION POLICY rp1 ON mydatabase`,
|
||||
exp: `{"results":[{"error":"database not found: mydatabase"}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests["retention_policy_auto_create"] = Test{
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "create database should succeed",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policies should return auto-created policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (tests Tests) load(t *testing.T, key string) Test {
|
||||
test, ok := tests[key]
|
||||
if !ok {
|
||||
t.Fatalf("no test %q", key)
|
||||
}
|
||||
|
||||
return test.duplicate()
|
||||
}
|
|
@ -31,115 +31,7 @@ func TestServer_DatabaseCommands(t *testing.T) {
|
|||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
test := Test{
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "create database should succeed",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should succeed",
|
||||
command: `CREATE DATABASE db0_r WITH DURATION 24h REPLICATION 2 NAME db0_r_policy`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error with bad name",
|
||||
command: `CREATE DATABASE 0xdb0`,
|
||||
exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should error with bad retention duration",
|
||||
command: `CREATE DATABASE db0 WITH DURATION xyz`,
|
||||
exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 35"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention replication should error with bad retention replication number",
|
||||
command: `CREATE DATABASE db0 WITH REPLICATION xyz`,
|
||||
exp: `{"error":"error parsing query: found xyz, expected number at line 1, char 38"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention name should error with missing retention name",
|
||||
command: `CREATE DATABASE db0 WITH NAME`,
|
||||
exp: `{"error":"error parsing query: found EOF, expected identifier at line 1, char 31"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error if it already exists",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{"error":"database already exists"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error if it already exists",
|
||||
command: `CREATE DATABASE db0_r`,
|
||||
exp: `{"results":[{"error":"database already exists"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should not error with existing database with IF NOT EXISTS",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should create non-existing database with IF NOT EXISTS",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should not error with existing database with IF NOT EXISTS",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION 24h`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error IF NOT EXISTS with bad retention duration",
|
||||
command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION xyz`,
|
||||
exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 49"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db0 should succeed",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db0_r should succeed",
|
||||
command: `DROP DATABASE db0_r`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db1 should succeed",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should error if it does not exists",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{"error":"database not found: db1"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should not error with non-existing database db1 WITH IF EXISTS",
|
||||
command: `DROP DATABASE IF EXISTS db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should have no results",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should error if it doesn't exist",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{"error":"database not found: db0"}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
test := tests.load(t, "database_commands")
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
|
@ -159,50 +51,15 @@ func TestServer_Query_DropAndRecreateDatabase(t *testing.T) {
|
|||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
test := tests.load(t, "drop_and_recreate_database")
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "Drop database after data write",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Recreate database",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Recreate retention policy",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 365d REPLICATION 1 DEFAULT`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Show measurements after recreate",
|
||||
command: `SHOW MEASUREMENTS`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data after recreate",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
|
@ -226,55 +83,18 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) {
|
|||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
test := tests.load(t, "drop_database_isolated")
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp1", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "Query data from 1st database",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database with GROUP BY *",
|
||||
command: `SELECT * FROM cpu GROUP BY *`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop other database",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database and ensure it's still there",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database and ensure it's still there with GROUP BY *",
|
||||
command: `SELECT * FROM cpu GROUP BY *`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
|
@ -298,41 +118,15 @@ func TestServer_Query_DropAndRecreateSeries(t *testing.T) {
|
|||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
test := tests.load(t, "drop_and_recreate_series")
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series after data write",
|
||||
command: `DROP SERIES FROM cpu`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
|
@ -351,21 +145,11 @@ func TestServer_Query_DropAndRecreateSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
// Re-write data and test again.
|
||||
reTest := NewTest("db0", "rp0")
|
||||
reTest.write = strings.Join(writes, "\n")
|
||||
retest := tests.load(t, "drop_and_recreate_series_retest")
|
||||
|
||||
reTest.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "Show series is present again after re-write",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range reTest.queries {
|
||||
for i, query := range retest.queries {
|
||||
if i == 0 {
|
||||
if err := reTest.init(s); err != nil {
|
||||
if err := retest.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -386,74 +170,15 @@ func TestServer_Query_DropSeriesFromRegex(t *testing.T) {
|
|||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
test := tests.load(t, "drop_series_from_regex")
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`a,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`aa,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`b,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`c,host=serverA,region=uswest val=30.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series after data write",
|
||||
command: `DROP SERIES FROM /a.*/`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series from regex that matches no measurements",
|
||||
command: `DROP SERIES FROM /a.*/`,
|
||||
exp: `{"results":[{}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "make sure DROP SERIES doesn't delete anything when regex doesn't match",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series with WHERE field should error",
|
||||
command: `DROP SERIES FROM c WHERE val > 50.0`,
|
||||
exp: `{"results":[{"error":"DROP SERIES doesn't support fields in WHERE clause"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "make sure DROP SERIES with field in WHERE didn't delete data",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series with WHERE time should error",
|
||||
command: `DROP SERIES FROM c WHERE time > now() - 1d`,
|
||||
exp: `{"results":[{"error":"DROP SERIES doesn't support time in WHERE clause"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
|
@ -480,79 +205,11 @@ func TestServer_RetentionPolicyCommands(t *testing.T) {
|
|||
s := OpenServer(c, "")
|
||||
defer s.Close()
|
||||
|
||||
// Create a database.
|
||||
if _, err := s.MetaStore.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
test := tests.load(t, "retention_policy_commands")
|
||||
|
||||
test := Test{
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "create retention policy should succeed",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create retention policy should error if it already exists",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"retention policy already exists"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should succeed",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "alter retention policy should succeed",
|
||||
command: `ALTER RETENTION POLICY rp0 ON db0 DURATION 2h REPLICATION 3 DEFAULT`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should have new altered information",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping default retention policy should not succeed",
|
||||
command: `DROP RETENTION POLICY rp0 ON db0`,
|
||||
exp: `{"results":[{"error":"retention policy is default"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should still show policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create a second non-default retention policy",
|
||||
command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show both",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true],["rp2","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping non-default retention policy succeed",
|
||||
command: `DROP RETENTION POLICY rp2 ON db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show just default",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Ensure retention policy with unacceptable retention cannot be created",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1s REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Check error when deleting retention policy on non-existent database",
|
||||
command: `DROP RETENTION POLICY rp1 ON mydatabase`,
|
||||
exp: `{"results":[{"error":"database not found"}]}`,
|
||||
},
|
||||
},
|
||||
// Create a database.
|
||||
if _, err := s.MetaStore.CreateDatabase(test.database()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
|
@ -574,20 +231,7 @@ func TestServer_DatabaseRetentionPolicyAutoCreate(t *testing.T) {
|
|||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
test := Test{
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "create database should succeed",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policies should return auto-created policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
test := tests.load(t, "retention_policy_auto_create")
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
|
@ -1635,7 +1279,7 @@ func TestServer_Query_Common(t *testing.T) {
|
|||
&Query{
|
||||
name: "selecting a from a non-existent retention policy should error",
|
||||
command: `SELECT value FROM db0.rp1.cpu`,
|
||||
exp: `{"results":[{"error":"retention policy not found"}]}`,
|
||||
exp: `{"results":[{"error":"retention policy not found: rp1"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "selecting a valid measurement and field should succeed",
|
||||
|
@ -3313,10 +2957,6 @@ func TestServer_Query_TopInt(t *testing.T) {
|
|||
continue
|
||||
}
|
||||
|
||||
println(">>>>", query.name)
|
||||
if query.name != `top - memory - host tag with limit 2` { // FIXME: temporary
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
|
|
35
meta/data.go
35
meta/data.go
|
@ -5,6 +5,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta/internal"
|
||||
)
|
||||
|
@ -174,14 +175,14 @@ func (data *Data) DropDatabase(name string) error {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(name)
|
||||
}
|
||||
|
||||
// RetentionPolicy returns a retention policy for a database by name.
|
||||
func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) {
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return nil, ErrDatabaseNotFound
|
||||
return nil, influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
for i := range di.RetentionPolicies {
|
||||
|
@ -205,7 +206,7 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
|
|||
// Find database.
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
} else if di.RetentionPolicy(rpi.Name) != nil {
|
||||
return ErrRetentionPolicyExists
|
||||
}
|
||||
|
@ -226,7 +227,7 @@ func (data *Data) DropRetentionPolicy(database, name string) error {
|
|||
// Find database.
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
// Prohibit dropping the default retention policy.
|
||||
|
@ -241,7 +242,7 @@ func (data *Data) DropRetentionPolicy(database, name string) error {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
return ErrRetentionPolicyNotFound
|
||||
return influxdb.ErrRetentionPolicyNotFound(name)
|
||||
}
|
||||
|
||||
// UpdateRetentionPolicy updates an existing retention policy.
|
||||
|
@ -249,13 +250,13 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol
|
|||
// Find database.
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
// Find policy.
|
||||
rpi := di.RetentionPolicy(name)
|
||||
if rpi == nil {
|
||||
return ErrRetentionPolicyNotFound
|
||||
return influxdb.ErrRetentionPolicyNotFound(name)
|
||||
}
|
||||
|
||||
// Ensure new policy doesn't match an existing policy.
|
||||
|
@ -288,9 +289,9 @@ func (data *Data) SetDefaultRetentionPolicy(database, name string) error {
|
|||
// Find database and verify policy exists.
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
} else if di.RetentionPolicy(name) == nil {
|
||||
return ErrRetentionPolicyNotFound
|
||||
return influxdb.ErrRetentionPolicyNotFound(name)
|
||||
}
|
||||
|
||||
// Set default policy.
|
||||
|
@ -306,7 +307,7 @@ func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
} else if rpi == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
return nil, influxdb.ErrRetentionPolicyNotFound(policy)
|
||||
}
|
||||
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
|
||||
for _, g := range rpi.ShardGroups {
|
||||
|
@ -326,7 +327,7 @@ func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax tim
|
|||
if err != nil {
|
||||
return nil, err
|
||||
} else if rpi == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
return nil, influxdb.ErrRetentionPolicyNotFound(policy)
|
||||
}
|
||||
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
|
||||
for _, g := range rpi.ShardGroups {
|
||||
|
@ -345,7 +346,7 @@ func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.
|
|||
if err != nil {
|
||||
return nil, err
|
||||
} else if rpi == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
return nil, influxdb.ErrRetentionPolicyNotFound(policy)
|
||||
}
|
||||
|
||||
return rpi.ShardGroupByTimestamp(timestamp), nil
|
||||
|
@ -363,7 +364,7 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
|
|||
if err != nil {
|
||||
return err
|
||||
} else if rpi == nil {
|
||||
return ErrRetentionPolicyNotFound
|
||||
return influxdb.ErrRetentionPolicyNotFound(policy)
|
||||
}
|
||||
|
||||
// Verify that shard group doesn't already exist for this timestamp.
|
||||
|
@ -426,7 +427,7 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
|
|||
if err != nil {
|
||||
return err
|
||||
} else if rpi == nil {
|
||||
return ErrRetentionPolicyNotFound
|
||||
return influxdb.ErrRetentionPolicyNotFound(policy)
|
||||
}
|
||||
|
||||
// Find shard group by ID and set its deletion timestamp.
|
||||
|
@ -444,7 +445,7 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
|
|||
func (data *Data) CreateContinuousQuery(database, name, query string) error {
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
// Ensure the name doesn't already exist.
|
||||
|
@ -467,7 +468,7 @@ func (data *Data) CreateContinuousQuery(database, name, query string) error {
|
|||
func (data *Data) DropContinuousQuery(database, name string) error {
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
for i := range di.ContinuousQueries {
|
||||
|
@ -486,7 +487,7 @@ func (data *Data) CreateSubscription(database, rp, name, mode string, destinatio
|
|||
return err
|
||||
}
|
||||
if rpi == nil {
|
||||
return ErrRetentionPolicyNotFound
|
||||
return influxdb.ErrRetentionPolicyNotFound(rp)
|
||||
}
|
||||
|
||||
// Ensure the name doesn't already exist.
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/meta/internal"
|
||||
|
@ -183,7 +184,8 @@ func TestData_CreateRetentionPolicy_ErrReplicationFactorTooLow(t *testing.T) {
|
|||
// Ensure that creating a retention policy on a non-existent database returns an error.
|
||||
func TestData_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
|
||||
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}}}
|
||||
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != meta.ErrDatabaseNotFound {
|
||||
expErr := influxdb.ErrDatabaseNotFound("db0")
|
||||
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err.Error() != expErr.Error() {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -249,7 +251,8 @@ func TestData_DropRetentionPolicy(t *testing.T) {
|
|||
// Ensure an error is returned when deleting a policy from a non-existent database.
|
||||
func TestData_DropRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
|
||||
var data meta.Data
|
||||
if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound {
|
||||
expErr := influxdb.ErrDatabaseNotFound("db0")
|
||||
if err := data.DropRetentionPolicy("db0", "rp0"); err.Error() != expErr.Error() {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -260,7 +263,8 @@ func TestData_DropRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
|
|||
if err := data.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrRetentionPolicyNotFound {
|
||||
expErr := influxdb.ErrRetentionPolicyNotFound("rp0")
|
||||
if err := data.DropRetentionPolicy("db0", "rp0"); err.Error() != expErr.Error() {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -290,7 +294,8 @@ func TestData_RetentionPolicy(t *testing.T) {
|
|||
// Ensure that retrieving a policy from a non-existent database returns an error.
|
||||
func TestData_RetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
|
||||
var data meta.Data
|
||||
if _, err := data.RetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound {
|
||||
expErr := influxdb.ErrDatabaseNotFound("db0")
|
||||
if _, err := data.RetentionPolicy("db0", "rp0"); err.Error() != expErr.Error() {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,6 @@ var (
|
|||
// ErrDatabaseExists is returned when creating an already existing database.
|
||||
ErrDatabaseExists = newError("database already exists")
|
||||
|
||||
// ErrDatabaseNotFound is returned when mutating a database that doesn't exist.
|
||||
ErrDatabaseNotFound = newError("database not found")
|
||||
|
||||
// ErrDatabaseNameRequired is returned when creating a database without a name.
|
||||
ErrDatabaseNameRequired = newError("database name required")
|
||||
)
|
||||
|
@ -54,9 +51,6 @@ var (
|
|||
// on a default retention policy.
|
||||
ErrRetentionPolicyDefault = newError("retention policy is default")
|
||||
|
||||
// ErrRetentionPolicyNotFound is returned when mutating a policy that doesn't exist.
|
||||
ErrRetentionPolicyNotFound = newError("retention policy not found")
|
||||
|
||||
// ErrRetentionPolicyNameRequired is returned when creating a policy without a name.
|
||||
ErrRetentionPolicyNameRequired = newError("retention policy name required")
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
)
|
||||
|
@ -292,7 +293,7 @@ func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.Sh
|
|||
if err != nil {
|
||||
return &influxql.Result{Err: err}
|
||||
} else if di == nil {
|
||||
return &influxql.Result{Err: ErrDatabaseNotFound}
|
||||
return &influxql.Result{Err: influxdb.ErrDatabaseNotFound(q.Database)}
|
||||
}
|
||||
|
||||
row := &models.Row{Columns: []string{"name", "duration", "replicaN", "default"}}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
|
@ -659,7 +660,8 @@ func TestStatementExecutor_ExecuteStatement_ShowRetentionPolicies_ErrDatabaseNot
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES ON db0`)); res.Err != meta.ErrDatabaseNotFound {
|
||||
expErr := influxdb.ErrDatabaseNotFound("db0")
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES ON db0`)); res.Err.Error() != expErr.Error() {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta/internal"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
@ -171,6 +172,14 @@ func NewStore(c *Config) *Store {
|
|||
return s
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Store) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
if s.rpc != nil {
|
||||
s.rpc.logger = l
|
||||
}
|
||||
}
|
||||
|
||||
// Path returns the root path when open.
|
||||
// Returns an empty string when the store is closed.
|
||||
func (s *Store) Path() string { return s.path }
|
||||
|
@ -1105,7 +1114,7 @@ func (s *Store) DefaultRetentionPolicy(database string) (rpi *RetentionPolicyInf
|
|||
err = s.read(func(data *Data) error {
|
||||
di := data.Database(database)
|
||||
if di == nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
for i := range di.RetentionPolicies {
|
||||
|
@ -1124,7 +1133,7 @@ func (s *Store) RetentionPolicies(database string) (a []RetentionPolicyInfo, err
|
|||
err = s.read(func(data *Data) error {
|
||||
di := data.Database(database)
|
||||
if di != nil {
|
||||
return ErrDatabaseNotFound
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
a = di.RetentionPolicies
|
||||
return nil
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tcp"
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
|
@ -240,7 +241,8 @@ func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
|
|||
s := MustOpenStore()
|
||||
defer s.Close()
|
||||
|
||||
if err := s.DropDatabase("no_such_database"); err != meta.ErrDatabaseNotFound {
|
||||
expErr := influxdb.ErrDatabaseNotFound("no_such_database")
|
||||
if err := s.DropDatabase("no_such_database"); err.Error() != expErr.Error() {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
|
@ -329,7 +330,7 @@ func (m *Monitor) createInternalStorage() {
|
|||
return
|
||||
}
|
||||
|
||||
if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != meta.ErrRetentionPolicyNotFound {
|
||||
if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != influxdb.ErrRetentionPolicyNotFound("default") {
|
||||
m.Logger.Printf("failed to delete retention policy 'default', failed to created internal storage: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
|
|
@ -85,6 +85,11 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.logger = l
|
||||
}
|
||||
|
||||
// Diagnostics returns diagnostics information.
|
||||
func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
|
||||
diagnostics := map[string]interface{}{
|
||||
|
|
Loading…
Reference in New Issue