move helpers to their own file
parent
99446786f9
commit
212bbd067f
|
@ -0,0 +1,265 @@
|
|||
// This package is a set of convenience helpers and structs to make integration testing easier
|
||||
package run_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cmd/influxd/run"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/services/httpd"
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
)
|
||||
|
||||
// Server represents a test wrapper for run.Server.
|
||||
type Server struct {
|
||||
*run.Server
|
||||
Config *run.Config
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
func NewServer(c *run.Config, joinURLs string) *Server {
|
||||
|
||||
s := Server{
|
||||
Server: run.NewServer(c, joinURLs),
|
||||
Config: c,
|
||||
}
|
||||
// Set the logger to discard unless verbose is on
|
||||
if !testing.Verbose() {
|
||||
type logSetter interface {
|
||||
SetLogger(*log.Logger)
|
||||
}
|
||||
nullLogger := log.New(ioutil.Discard, "", 0)
|
||||
s.MetaStore.Logger = nullLogger
|
||||
s.TSDBStore.Logger = nullLogger
|
||||
for _, service := range s.Services {
|
||||
if service, ok := service.(logSetter); ok {
|
||||
service.SetLogger(nullLogger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &s
|
||||
}
|
||||
|
||||
// OpenServer opens a test server.
|
||||
func OpenServer(c *run.Config, joinURLs string) *Server {
|
||||
s := NewServer(c, joinURLs)
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Close shuts down the server and removes all temporary paths.
|
||||
func (s *Server) Close() {
|
||||
os.RemoveAll(s.Config.Meta.Dir)
|
||||
os.RemoveAll(s.Config.Data.Dir)
|
||||
s.Server.Close()
|
||||
}
|
||||
|
||||
// URL returns the base URL for the httpd endpoint.
|
||||
func (s *Server) URL() string {
|
||||
for _, service := range s.Services {
|
||||
if service, ok := service.(*httpd.Service); ok {
|
||||
return "http://" + service.Addr().String()
|
||||
}
|
||||
}
|
||||
panic("httpd server not found in services")
|
||||
}
|
||||
|
||||
// CreateDatabaseAndRetentionPolicy will create the datbase and retnetion policy.
|
||||
func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicyInfo) error {
|
||||
if _, err := s.MetaStore.CreateDatabase(db); err != nil {
|
||||
return err
|
||||
} else if _, err := s.MetaStore.CreateRetentionPolicy(db, rp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query executes a query against the server and returns the results.
|
||||
func (s *Server) Query(query string) (results string, err error) {
|
||||
return s.QueryWithParams(query, nil)
|
||||
}
|
||||
|
||||
// Query executes a query against the server and returns the results.
|
||||
func (s *Server) QueryWithParams(query string, values url.Values) (results string, err error) {
|
||||
if values == nil {
|
||||
values = url.Values{}
|
||||
}
|
||||
values.Set("q", query)
|
||||
resp, err := http.Get(s.URL() + "/query?" + values.Encode())
|
||||
if err != nil {
|
||||
return "", err
|
||||
//} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusBadRequest {
|
||||
}
|
||||
body := string(MustReadAll(resp.Body))
|
||||
switch resp.StatusCode {
|
||||
case http.StatusBadRequest:
|
||||
if !expectPattern(".*error parsing query*.", body) {
|
||||
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
|
||||
}
|
||||
return body, nil
|
||||
case http.StatusOK:
|
||||
return body, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
|
||||
}
|
||||
}
|
||||
|
||||
// Write executes a write against the server and returns the results.
|
||||
func (s *Server) Write(db, rp, body string) (results string, err error) {
|
||||
v := url.Values{"db": {db}, "rp": {rp}}
|
||||
resp, err := http.Post(s.URL()+"/write?"+v.Encode(), "", strings.NewReader(body))
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
|
||||
return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body))
|
||||
}
|
||||
return string(MustReadAll(resp.Body)), nil
|
||||
}
|
||||
|
||||
// NewConfig returns the default config with temporary paths.
|
||||
func NewConfig() *run.Config {
|
||||
c := run.NewConfig()
|
||||
c.Cluster.BindAddress = "127.0.0.1:0"
|
||||
c.Meta.Dir = MustTempFile()
|
||||
c.Meta.BindAddress = "127.0.0.1:0"
|
||||
c.Meta.HeartbeatTimeout = toml.Duration(50 * time.Millisecond)
|
||||
c.Meta.ElectionTimeout = toml.Duration(50 * time.Millisecond)
|
||||
c.Meta.LeaderLeaseTimeout = toml.Duration(50 * time.Millisecond)
|
||||
c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond)
|
||||
|
||||
c.Data.Dir = MustTempFile()
|
||||
|
||||
c.HTTPD.Enabled = true
|
||||
c.HTTPD.BindAddress = "127.0.0.1:0"
|
||||
c.HTTPD.LogEnabled = testing.Verbose()
|
||||
return c
|
||||
}
|
||||
|
||||
func newRetentionPolicyInfo(name string, rf int, duration time.Duration) *meta.RetentionPolicyInfo {
|
||||
return &meta.RetentionPolicyInfo{Name: name, ReplicaN: rf, Duration: duration}
|
||||
}
|
||||
|
||||
func now() time.Time {
|
||||
return time.Now().UTC()
|
||||
}
|
||||
|
||||
func yesterday() time.Time {
|
||||
return now().Add(-1 * time.Hour * 24)
|
||||
}
|
||||
|
||||
func mustParseTime(layout, value string) time.Time {
|
||||
tm, err := time.Parse(layout, value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return tm
|
||||
}
|
||||
|
||||
// MustReadAll reads r. Panic on error.
|
||||
func MustReadAll(r io.Reader) []byte {
|
||||
b, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// MustTempFile returns a path to a temporary file.
|
||||
func MustTempFile() string {
|
||||
f, err := ioutil.TempFile("", "influxd-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
return f.Name()
|
||||
}
|
||||
|
||||
func expectPattern(exp, act string) bool {
|
||||
re := regexp.MustCompile(exp)
|
||||
if !re.MatchString(act) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
name string
|
||||
command string
|
||||
params url.Values
|
||||
exp, act string
|
||||
pattern bool
|
||||
skip bool
|
||||
}
|
||||
|
||||
// Execute runs the command and returns an err if it fails
|
||||
func (q *Query) Execute(s *Server) (err error) {
|
||||
if q.params == nil {
|
||||
q.act, err = s.Query(q.command)
|
||||
return
|
||||
}
|
||||
q.act, err = s.QueryWithParams(q.command, q.params)
|
||||
return
|
||||
}
|
||||
|
||||
func (q *Query) success() bool {
|
||||
if q.pattern {
|
||||
return expectPattern(q.exp, q.act)
|
||||
}
|
||||
return q.exp == q.act
|
||||
}
|
||||
|
||||
func (q *Query) Error(err error) string {
|
||||
return fmt.Sprintf("%s: %v", q.name, err)
|
||||
}
|
||||
|
||||
func (q *Query) failureMessage() string {
|
||||
return fmt.Sprintf("%s: unexpected results for query: %s\nexp: %s\nactual: %s\n", q.name, q.command, q.exp, q.act)
|
||||
}
|
||||
|
||||
type Test struct {
|
||||
initialized bool
|
||||
write string
|
||||
db string
|
||||
rp string
|
||||
exp string
|
||||
queries []*Query
|
||||
}
|
||||
|
||||
func NewTest(db, rp string) Test {
|
||||
return Test{
|
||||
db: db,
|
||||
rp: rp,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Test) addQueries(q ...*Query) {
|
||||
t.queries = append(t.queries, q...)
|
||||
}
|
||||
|
||||
func (t *Test) init(s *Server) error {
|
||||
if t.write == "" || t.initialized {
|
||||
return nil
|
||||
}
|
||||
t.initialized = true
|
||||
if res, err := s.Write(t.db, t.rp, t.write); err != nil {
|
||||
return err
|
||||
} else if t.exp != res {
|
||||
return fmt.Errorf("unexpected results\nexp: %s\ngot: %s\n", t.exp, res)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -2,22 +2,11 @@ package run_test
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cmd/influxd/run"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/services/httpd"
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
)
|
||||
|
||||
// Ensure the database commands work.
|
||||
|
@ -33,6 +22,16 @@ func TestServer_DatabaseCommands(t *testing.T) {
|
|||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error with bad name",
|
||||
command: `CREATE DATABASE 0xdb0`,
|
||||
exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error if it already exists",
|
||||
command: `CREATE DATABASE db0`,
|
||||
|
@ -44,6 +43,12 @@ func TestServer_DatabaseCommands(t *testing.T) {
|
|||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
skip: true,
|
||||
name: "show database should have no results - FIXME pauldix",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `FIXME`,
|
||||
},
|
||||
&Query{
|
||||
skip: true,
|
||||
name: "drop database should error if it doesn't exist - FIXME pauldix",
|
||||
|
@ -130,6 +135,101 @@ func TestServer_RetentionPolicyCommands(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure user commands work.
|
||||
func TestServer_UserCommands(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
// Create a database.
|
||||
if _, err := s.MetaStore.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
test := Test{
|
||||
queries: []*Query{
|
||||
&Query{
|
||||
name: "show users, no actual users",
|
||||
command: `SHOW USERS`,
|
||||
exp: `{"results":[{"series":[{"columns":["user","admin"]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: `create user`,
|
||||
command: "CREATE USER jdoe WITH PASSWORD '1337'",
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show users, 1 existing user",
|
||||
command: `SHOW USERS`,
|
||||
exp: `{"results":[{"series":[{"columns":["user","admin"],"values":[["jdoe",false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "grant all priviledges to jdoe",
|
||||
command: `GRANT ALL PRIVILEGES TO jdoe`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
skip: true,
|
||||
name: "show users, existing user as admin - FIXME",
|
||||
command: `SHOW USERS`,
|
||||
exp: `{"results":[{"series":[{"columns":["user","admin"],"values":[["jdoe",true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "grant DB privileges to user",
|
||||
command: `GRANT READ ON db0 TO jdoe`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "revoke all privileges",
|
||||
command: `REVOKE ALL PRIVILEGES FROM jdoe`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "bad create user request",
|
||||
command: `CREATE USER 0xBAD WITH PASSWORD pwd1337`,
|
||||
exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 13"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "bad create user request, no name",
|
||||
command: `CREATE USER WITH PASSWORD pwd1337`,
|
||||
exp: `{"error":"error parsing query: found WITH, expected identifier at line 1, char 13"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "bad create user request, no password",
|
||||
command: `CREATE USER jdoe`,
|
||||
exp: `{"error":"error parsing query: found EOF, expected WITH at line 1, char 18"}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop user",
|
||||
command: `DROP USER jdoe`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "make sure user was dropped",
|
||||
command: `SHOW USERS`,
|
||||
exp: `{"results":[{"series":[{"columns":["user","admin"]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "delete non existing user",
|
||||
command: `DROP USER noone`,
|
||||
exp: `{"results":[{"error":"user not found"}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(fmt.Sprintf("command: %s - err: %s", query.command, query.Error(err)))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server can create a single point via json protocol and read it back.
|
||||
func TestServer_Write_JSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -140,7 +240,7 @@ func TestServer_Write_JSON(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
if res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano))); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
|
@ -165,7 +265,7 @@ func TestServer_Write_LineProtocol_Float(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=1.0 `+strconv.FormatInt(now.UnixNano(), 10)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
|
@ -190,7 +290,7 @@ func TestServer_Write_LineProtocol_Bool(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=true `+strconv.FormatInt(now.UnixNano(), 10)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
|
@ -215,7 +315,7 @@ func TestServer_Write_LineProtocol_String(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
if res, err := s.Write("db0", "rp0", `cpu,host=server01 value="disk full" `+strconv.FormatInt(now.UnixNano(), 10)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
|
@ -240,7 +340,7 @@ func TestServer_Write_LineProtocol_Integer(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=100 `+strconv.FormatInt(now.UnixNano(), 10)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
|
@ -320,7 +420,7 @@ func TestServer_Query_Count(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10)
|
||||
|
@ -365,7 +465,7 @@ func TestServer_Query_Now(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10)
|
||||
|
@ -411,7 +511,7 @@ func TestServer_Query_EpochPrecision(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10)
|
||||
|
@ -482,7 +582,7 @@ func TestServer_Query_Tags(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = fmt.Sprintf("cpu,host=server01 value=100,core=4 %s\ncpu,host=server02 value=50,core=2 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10))
|
||||
|
@ -542,12 +642,22 @@ func TestServer_Query_Common(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := now()
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = fmt.Sprintf("cpu,host=server01 value=1 %s", strconv.FormatInt(now.UnixNano(), 10))
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "selecting a from a non-existent database should error",
|
||||
command: `SELECT value FROM db1.rp0.cpu`,
|
||||
exp: `{"results":[{"error":"database not found"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "selecting a from a non-existent retention policy should error",
|
||||
command: `SELECT value FROM db0.rp1.cpu`,
|
||||
exp: `{"results":[{"error":"retention policy not found"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "selecting a valid measurement and field should succeed",
|
||||
command: `SELECT value FROM db0.rp0.cpu`,
|
||||
|
@ -893,237 +1003,3 @@ func TestServer_Query_Regex(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Server represents a test wrapper for run.Server.
|
||||
type Server struct {
|
||||
*run.Server
|
||||
Config *run.Config
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
func NewServer(c *run.Config, joinURLs string) *Server {
|
||||
|
||||
s := Server{
|
||||
Server: run.NewServer(c, joinURLs),
|
||||
Config: c,
|
||||
}
|
||||
// Set the logger to discard unless verbose is on
|
||||
if !testing.Verbose() {
|
||||
type logSetter interface {
|
||||
SetLogger(*log.Logger)
|
||||
}
|
||||
nullLogger := log.New(ioutil.Discard, "", 0)
|
||||
s.MetaStore.Logger = nullLogger
|
||||
s.TSDBStore.Logger = nullLogger
|
||||
for _, service := range s.Services {
|
||||
if service, ok := service.(logSetter); ok {
|
||||
service.SetLogger(nullLogger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &s
|
||||
}
|
||||
|
||||
// OpenServer opens a test server.
|
||||
func OpenServer(c *run.Config, joinURLs string) *Server {
|
||||
s := NewServer(c, joinURLs)
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Close shuts down the server and removes all temporary paths.
|
||||
func (s *Server) Close() {
|
||||
os.RemoveAll(s.Config.Meta.Dir)
|
||||
os.RemoveAll(s.Config.Data.Dir)
|
||||
s.Server.Close()
|
||||
}
|
||||
|
||||
// URL returns the base URL for the httpd endpoint.
|
||||
func (s *Server) URL() string {
|
||||
for _, service := range s.Services {
|
||||
if service, ok := service.(*httpd.Service); ok {
|
||||
return "http://" + service.Addr().String()
|
||||
}
|
||||
}
|
||||
panic("httpd server not found in services")
|
||||
}
|
||||
|
||||
// CreateDatabaseAndRetentionPolicy will create the datbase and retnetion policy.
|
||||
func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicyInfo) error {
|
||||
if _, err := s.MetaStore.CreateDatabase(db); err != nil {
|
||||
return err
|
||||
} else if _, err := s.MetaStore.CreateRetentionPolicy(db, rp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query executes a query against the server and returns the results.
|
||||
func (s *Server) Query(query string) (results string, err error) {
|
||||
return s.QueryWithParams(query, nil)
|
||||
}
|
||||
|
||||
// Query executes a query against the server and returns the results.
|
||||
func (s *Server) QueryWithParams(query string, values url.Values) (results string, err error) {
|
||||
if values == nil {
|
||||
values = url.Values{}
|
||||
}
|
||||
values.Set("q", query)
|
||||
resp, err := http.Get(s.URL() + "/query?" + values.Encode())
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if resp.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body))
|
||||
}
|
||||
return string(MustReadAll(resp.Body)), nil
|
||||
}
|
||||
|
||||
// Write executes a write against the server and returns the results.
|
||||
func (s *Server) Write(db, rp, body string) (results string, err error) {
|
||||
v := url.Values{"db": {db}, "rp": {rp}}
|
||||
resp, err := http.Post(s.URL()+"/write?"+v.Encode(), "", strings.NewReader(body))
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
|
||||
return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body))
|
||||
}
|
||||
return string(MustReadAll(resp.Body)), nil
|
||||
}
|
||||
|
||||
// NewConfig returns the default config with temporary paths.
|
||||
func NewConfig() *run.Config {
|
||||
c := run.NewConfig()
|
||||
c.Cluster.BindAddress = "127.0.0.1:0"
|
||||
c.Meta.Dir = MustTempFile()
|
||||
c.Meta.BindAddress = "127.0.0.1:0"
|
||||
c.Meta.HeartbeatTimeout = toml.Duration(50 * time.Millisecond)
|
||||
c.Meta.ElectionTimeout = toml.Duration(50 * time.Millisecond)
|
||||
c.Meta.LeaderLeaseTimeout = toml.Duration(50 * time.Millisecond)
|
||||
c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond)
|
||||
|
||||
c.Data.Dir = MustTempFile()
|
||||
|
||||
c.HTTPD.Enabled = true
|
||||
c.HTTPD.BindAddress = "127.0.0.1:0"
|
||||
c.HTTPD.LogEnabled = testing.Verbose()
|
||||
return c
|
||||
}
|
||||
|
||||
func newRetentionPolicyInfo(name string, rf int, duration time.Duration) *meta.RetentionPolicyInfo {
|
||||
return &meta.RetentionPolicyInfo{Name: name, ReplicaN: rf, Duration: duration}
|
||||
}
|
||||
|
||||
func now() time.Time {
|
||||
return time.Now().UTC()
|
||||
}
|
||||
|
||||
func yesterday() time.Time {
|
||||
return now().Add(-1 * time.Hour * 24)
|
||||
}
|
||||
|
||||
func mustParseTime(layout, value string) time.Time {
|
||||
tm, err := time.Parse(layout, value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return tm
|
||||
}
|
||||
|
||||
// MustReadAll reads r. Panic on error.
|
||||
func MustReadAll(r io.Reader) []byte {
|
||||
b, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// MustTempFile returns a path to a temporary file.
|
||||
func MustTempFile() string {
|
||||
f, err := ioutil.TempFile("", "influxd-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
return f.Name()
|
||||
}
|
||||
|
||||
func expectPattern(exp, act string) bool {
|
||||
re := regexp.MustCompile(exp)
|
||||
if !re.MatchString(act) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
name string
|
||||
command string
|
||||
params url.Values
|
||||
exp, act string
|
||||
pattern bool
|
||||
skip bool
|
||||
}
|
||||
|
||||
// Execute runs the command and returns an err if it fails
|
||||
func (q *Query) Execute(s *Server) (err error) {
|
||||
if q.params == nil {
|
||||
q.act, err = s.Query(q.command)
|
||||
return
|
||||
}
|
||||
q.act, err = s.QueryWithParams(q.command, q.params)
|
||||
return
|
||||
}
|
||||
|
||||
func (q *Query) success() bool {
|
||||
if q.pattern {
|
||||
return expectPattern(q.exp, q.act)
|
||||
}
|
||||
return q.exp == q.act
|
||||
}
|
||||
|
||||
func (q *Query) Error(err error) string {
|
||||
return fmt.Sprintf("%s: %v", q.name, err)
|
||||
}
|
||||
|
||||
func (q *Query) failureMessage() string {
|
||||
return fmt.Sprintf("%s: unexpected results for query: %s\nexp: %s\nactual: %s\n", q.name, q.command, q.exp, q.act)
|
||||
}
|
||||
|
||||
type Test struct {
|
||||
initialized bool
|
||||
write string
|
||||
db string
|
||||
rp string
|
||||
exp string
|
||||
queries []*Query
|
||||
}
|
||||
|
||||
func NewTest(db, rp string) Test {
|
||||
return Test{
|
||||
db: db,
|
||||
rp: rp,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Test) addQueries(q ...*Query) {
|
||||
t.queries = append(t.queries, q...)
|
||||
}
|
||||
|
||||
func (t *Test) init(s *Server) error {
|
||||
if t.write == "" || t.initialized {
|
||||
return nil
|
||||
}
|
||||
t.initialized = true
|
||||
if res, err := s.Write(t.db, t.rp, t.write); err != nil {
|
||||
return err
|
||||
} else if t.exp != res {
|
||||
return fmt.Errorf("unexpected results\nexp: %s\ngot: %s\n", t.exp, res)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue