diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go new file mode 100644 index 0000000000..c679fc0e6a --- /dev/null +++ b/cmd/influxd/run/server_helpers_test.go @@ -0,0 +1,265 @@ +// This package is a set of convenience helpers and structs to make integration testing easier +package run_test + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "regexp" + "strings" + "testing" + "time" + + "github.com/influxdb/influxdb/cmd/influxd/run" + "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/services/httpd" + "github.com/influxdb/influxdb/toml" +) + +// Server represents a test wrapper for run.Server. +type Server struct { + *run.Server + Config *run.Config +} + +// NewServer returns a new instance of Server. +func NewServer(c *run.Config, joinURLs string) *Server { + + s := Server{ + Server: run.NewServer(c, joinURLs), + Config: c, + } + // Set the logger to discard unless verbose is on + if !testing.Verbose() { + type logSetter interface { + SetLogger(*log.Logger) + } + nullLogger := log.New(ioutil.Discard, "", 0) + s.MetaStore.Logger = nullLogger + s.TSDBStore.Logger = nullLogger + for _, service := range s.Services { + if service, ok := service.(logSetter); ok { + service.SetLogger(nullLogger) + } + } + } + + return &s +} + +// OpenServer opens a test server. +func OpenServer(c *run.Config, joinURLs string) *Server { + s := NewServer(c, joinURLs) + if err := s.Open(); err != nil { + panic(err.Error()) + } + + return s +} + +// Close shuts down the server and removes all temporary paths. +func (s *Server) Close() { + os.RemoveAll(s.Config.Meta.Dir) + os.RemoveAll(s.Config.Data.Dir) + s.Server.Close() +} + +// URL returns the base URL for the httpd endpoint. +func (s *Server) URL() string { + for _, service := range s.Services { + if service, ok := service.(*httpd.Service); ok { + return "http://" + service.Addr().String() + } + } + panic("httpd server not found in services") +} + +// CreateDatabaseAndRetentionPolicy will create the datbase and retnetion policy. +func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicyInfo) error { + if _, err := s.MetaStore.CreateDatabase(db); err != nil { + return err + } else if _, err := s.MetaStore.CreateRetentionPolicy(db, rp); err != nil { + return err + } + return nil +} + +// Query executes a query against the server and returns the results. +func (s *Server) Query(query string) (results string, err error) { + return s.QueryWithParams(query, nil) +} + +// Query executes a query against the server and returns the results. +func (s *Server) QueryWithParams(query string, values url.Values) (results string, err error) { + if values == nil { + values = url.Values{} + } + values.Set("q", query) + resp, err := http.Get(s.URL() + "/query?" + values.Encode()) + if err != nil { + return "", err + //} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusBadRequest { + } + body := string(MustReadAll(resp.Body)) + switch resp.StatusCode { + case http.StatusBadRequest: + if !expectPattern(".*error parsing query*.", body) { + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } + return body, nil + case http.StatusOK: + return body, nil + default: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } +} + +// Write executes a write against the server and returns the results. +func (s *Server) Write(db, rp, body string) (results string, err error) { + v := url.Values{"db": {db}, "rp": {rp}} + resp, err := http.Post(s.URL()+"/write?"+v.Encode(), "", strings.NewReader(body)) + if err != nil { + return "", err + } else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body)) + } + return string(MustReadAll(resp.Body)), nil +} + +// NewConfig returns the default config with temporary paths. +func NewConfig() *run.Config { + c := run.NewConfig() + c.Cluster.BindAddress = "127.0.0.1:0" + c.Meta.Dir = MustTempFile() + c.Meta.BindAddress = "127.0.0.1:0" + c.Meta.HeartbeatTimeout = toml.Duration(50 * time.Millisecond) + c.Meta.ElectionTimeout = toml.Duration(50 * time.Millisecond) + c.Meta.LeaderLeaseTimeout = toml.Duration(50 * time.Millisecond) + c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond) + + c.Data.Dir = MustTempFile() + + c.HTTPD.Enabled = true + c.HTTPD.BindAddress = "127.0.0.1:0" + c.HTTPD.LogEnabled = testing.Verbose() + return c +} + +func newRetentionPolicyInfo(name string, rf int, duration time.Duration) *meta.RetentionPolicyInfo { + return &meta.RetentionPolicyInfo{Name: name, ReplicaN: rf, Duration: duration} +} + +func now() time.Time { + return time.Now().UTC() +} + +func yesterday() time.Time { + return now().Add(-1 * time.Hour * 24) +} + +func mustParseTime(layout, value string) time.Time { + tm, err := time.Parse(layout, value) + if err != nil { + panic(err) + } + return tm +} + +// MustReadAll reads r. Panic on error. +func MustReadAll(r io.Reader) []byte { + b, err := ioutil.ReadAll(r) + if err != nil { + panic(err) + } + return b +} + +// MustTempFile returns a path to a temporary file. +func MustTempFile() string { + f, err := ioutil.TempFile("", "influxd-") + if err != nil { + panic(err) + } + f.Close() + os.Remove(f.Name()) + return f.Name() +} + +func expectPattern(exp, act string) bool { + re := regexp.MustCompile(exp) + if !re.MatchString(act) { + return false + } + return true +} + +type Query struct { + name string + command string + params url.Values + exp, act string + pattern bool + skip bool +} + +// Execute runs the command and returns an err if it fails +func (q *Query) Execute(s *Server) (err error) { + if q.params == nil { + q.act, err = s.Query(q.command) + return + } + q.act, err = s.QueryWithParams(q.command, q.params) + return +} + +func (q *Query) success() bool { + if q.pattern { + return expectPattern(q.exp, q.act) + } + return q.exp == q.act +} + +func (q *Query) Error(err error) string { + return fmt.Sprintf("%s: %v", q.name, err) +} + +func (q *Query) failureMessage() string { + return fmt.Sprintf("%s: unexpected results for query: %s\nexp: %s\nactual: %s\n", q.name, q.command, q.exp, q.act) +} + +type Test struct { + initialized bool + write string + db string + rp string + exp string + queries []*Query +} + +func NewTest(db, rp string) Test { + return Test{ + db: db, + rp: rp, + } +} + +func (t *Test) addQueries(q ...*Query) { + t.queries = append(t.queries, q...) +} + +func (t *Test) init(s *Server) error { + if t.write == "" || t.initialized { + return nil + } + t.initialized = true + if res, err := s.Write(t.db, t.rp, t.write); err != nil { + return err + } else if t.exp != res { + return fmt.Errorf("unexpected results\nexp: %s\ngot: %s\n", t.exp, res) + } + return nil +} diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index cdcb813bf3..c3cb2cfa4a 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -2,22 +2,11 @@ package run_test import ( "fmt" - "io" - "io/ioutil" - "log" - "net/http" "net/url" - "os" - "regexp" "strconv" "strings" "testing" "time" - - "github.com/influxdb/influxdb/cmd/influxd/run" - "github.com/influxdb/influxdb/meta" - "github.com/influxdb/influxdb/services/httpd" - "github.com/influxdb/influxdb/toml" ) // Ensure the database commands work. @@ -33,6 +22,16 @@ func TestServer_DatabaseCommands(t *testing.T) { command: `CREATE DATABASE db0`, exp: `{"results":[{}]}`, }, + &Query{ + name: "create database should error with bad name", + command: `CREATE DATABASE 0xdb0`, + exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`, + }, + &Query{ + name: "show database should succeed", + command: `SHOW DATABASES`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`, + }, &Query{ name: "create database should error if it already exists", command: `CREATE DATABASE db0`, @@ -44,6 +43,12 @@ func TestServer_DatabaseCommands(t *testing.T) { command: `DROP DATABASE db0`, exp: `{"results":[{}]}`, }, + &Query{ + skip: true, + name: "show database should have no results - FIXME pauldix", + command: `SHOW DATABASES`, + exp: `FIXME`, + }, &Query{ skip: true, name: "drop database should error if it doesn't exist - FIXME pauldix", @@ -130,6 +135,101 @@ func TestServer_RetentionPolicyCommands(t *testing.T) { } } +// Ensure user commands work. +func TestServer_UserCommands(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig(), "") + defer s.Close() + + // Create a database. + if _, err := s.MetaStore.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + + test := Test{ + queries: []*Query{ + &Query{ + name: "show users, no actual users", + command: `SHOW USERS`, + exp: `{"results":[{"series":[{"columns":["user","admin"]}]}]}`, + }, + &Query{ + name: `create user`, + command: "CREATE USER jdoe WITH PASSWORD '1337'", + exp: `{"results":[{}]}`, + }, + &Query{ + name: "show users, 1 existing user", + command: `SHOW USERS`, + exp: `{"results":[{"series":[{"columns":["user","admin"],"values":[["jdoe",false]]}]}]}`, + }, + &Query{ + name: "grant all priviledges to jdoe", + command: `GRANT ALL PRIVILEGES TO jdoe`, + exp: `{"results":[{}]}`, + }, + &Query{ + skip: true, + name: "show users, existing user as admin - FIXME", + command: `SHOW USERS`, + exp: `{"results":[{"series":[{"columns":["user","admin"],"values":[["jdoe",true]]}]}]}`, + }, + &Query{ + name: "grant DB privileges to user", + command: `GRANT READ ON db0 TO jdoe`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "revoke all privileges", + command: `REVOKE ALL PRIVILEGES FROM jdoe`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "bad create user request", + command: `CREATE USER 0xBAD WITH PASSWORD pwd1337`, + exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 13"}`, + }, + &Query{ + name: "bad create user request, no name", + command: `CREATE USER WITH PASSWORD pwd1337`, + exp: `{"error":"error parsing query: found WITH, expected identifier at line 1, char 13"}`, + }, + &Query{ + name: "bad create user request, no password", + command: `CREATE USER jdoe`, + exp: `{"error":"error parsing query: found EOF, expected WITH at line 1, char 18"}`, + }, + &Query{ + name: "drop user", + command: `DROP USER jdoe`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "make sure user was dropped", + command: `SHOW USERS`, + exp: `{"results":[{"series":[{"columns":["user","admin"]}]}]}`, + }, + &Query{ + name: "delete non existing user", + command: `DROP USER noone`, + exp: `{"results":[{"error":"user not found"}]}`, + }, + }, + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(fmt.Sprintf("command: %s - err: %s", query.command, query.Error(err))) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + // Ensure the server can create a single point via json protocol and read it back. func TestServer_Write_JSON(t *testing.T) { t.Parallel() @@ -140,7 +240,7 @@ func TestServer_Write_JSON(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() if res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano))); err != nil { t.Fatal(err) } else if exp := ``; exp != res { @@ -165,7 +265,7 @@ func TestServer_Write_LineProtocol_Float(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=1.0 `+strconv.FormatInt(now.UnixNano(), 10)); err != nil { t.Fatal(err) } else if exp := ``; exp != res { @@ -190,7 +290,7 @@ func TestServer_Write_LineProtocol_Bool(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=true `+strconv.FormatInt(now.UnixNano(), 10)); err != nil { t.Fatal(err) } else if exp := ``; exp != res { @@ -215,7 +315,7 @@ func TestServer_Write_LineProtocol_String(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() if res, err := s.Write("db0", "rp0", `cpu,host=server01 value="disk full" `+strconv.FormatInt(now.UnixNano(), 10)); err != nil { t.Fatal(err) } else if exp := ``; exp != res { @@ -240,7 +340,7 @@ func TestServer_Write_LineProtocol_Integer(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=100 `+strconv.FormatInt(now.UnixNano(), 10)); err != nil { t.Fatal(err) } else if exp := ``; exp != res { @@ -320,7 +420,7 @@ func TestServer_Query_Count(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() test := NewTest("db0", "rp0") test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10) @@ -365,7 +465,7 @@ func TestServer_Query_Now(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() test := NewTest("db0", "rp0") test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10) @@ -411,7 +511,7 @@ func TestServer_Query_EpochPrecision(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() test := NewTest("db0", "rp0") test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10) @@ -482,7 +582,7 @@ func TestServer_Query_Tags(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() test := NewTest("db0", "rp0") test.write = fmt.Sprintf("cpu,host=server01 value=100,core=4 %s\ncpu,host=server02 value=50,core=2 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10)) @@ -542,12 +642,22 @@ func TestServer_Query_Common(t *testing.T) { t.Fatal(err) } - now := time.Now().UTC() + now := now() test := NewTest("db0", "rp0") test.write = fmt.Sprintf("cpu,host=server01 value=1 %s", strconv.FormatInt(now.UnixNano(), 10)) test.addQueries([]*Query{ + &Query{ + name: "selecting a from a non-existent database should error", + command: `SELECT value FROM db1.rp0.cpu`, + exp: `{"results":[{"error":"database not found"}]}`, + }, + &Query{ + name: "selecting a from a non-existent retention policy should error", + command: `SELECT value FROM db0.rp1.cpu`, + exp: `{"results":[{"error":"retention policy not found"}]}`, + }, &Query{ name: "selecting a valid measurement and field should succeed", command: `SELECT value FROM db0.rp0.cpu`, @@ -893,237 +1003,3 @@ func TestServer_Query_Regex(t *testing.T) { } } } - -// Server represents a test wrapper for run.Server. -type Server struct { - *run.Server - Config *run.Config -} - -// NewServer returns a new instance of Server. -func NewServer(c *run.Config, joinURLs string) *Server { - - s := Server{ - Server: run.NewServer(c, joinURLs), - Config: c, - } - // Set the logger to discard unless verbose is on - if !testing.Verbose() { - type logSetter interface { - SetLogger(*log.Logger) - } - nullLogger := log.New(ioutil.Discard, "", 0) - s.MetaStore.Logger = nullLogger - s.TSDBStore.Logger = nullLogger - for _, service := range s.Services { - if service, ok := service.(logSetter); ok { - service.SetLogger(nullLogger) - } - } - } - - return &s -} - -// OpenServer opens a test server. -func OpenServer(c *run.Config, joinURLs string) *Server { - s := NewServer(c, joinURLs) - if err := s.Open(); err != nil { - panic(err.Error()) - } - - return s -} - -// Close shuts down the server and removes all temporary paths. -func (s *Server) Close() { - os.RemoveAll(s.Config.Meta.Dir) - os.RemoveAll(s.Config.Data.Dir) - s.Server.Close() -} - -// URL returns the base URL for the httpd endpoint. -func (s *Server) URL() string { - for _, service := range s.Services { - if service, ok := service.(*httpd.Service); ok { - return "http://" + service.Addr().String() - } - } - panic("httpd server not found in services") -} - -// CreateDatabaseAndRetentionPolicy will create the datbase and retnetion policy. -func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicyInfo) error { - if _, err := s.MetaStore.CreateDatabase(db); err != nil { - return err - } else if _, err := s.MetaStore.CreateRetentionPolicy(db, rp); err != nil { - return err - } - return nil -} - -// Query executes a query against the server and returns the results. -func (s *Server) Query(query string) (results string, err error) { - return s.QueryWithParams(query, nil) -} - -// Query executes a query against the server and returns the results. -func (s *Server) QueryWithParams(query string, values url.Values) (results string, err error) { - if values == nil { - values = url.Values{} - } - values.Set("q", query) - resp, err := http.Get(s.URL() + "/query?" + values.Encode()) - if err != nil { - return "", err - } else if resp.StatusCode != http.StatusOK { - return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body)) - } - return string(MustReadAll(resp.Body)), nil -} - -// Write executes a write against the server and returns the results. -func (s *Server) Write(db, rp, body string) (results string, err error) { - v := url.Values{"db": {db}, "rp": {rp}} - resp, err := http.Post(s.URL()+"/write?"+v.Encode(), "", strings.NewReader(body)) - if err != nil { - return "", err - } else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body)) - } - return string(MustReadAll(resp.Body)), nil -} - -// NewConfig returns the default config with temporary paths. -func NewConfig() *run.Config { - c := run.NewConfig() - c.Cluster.BindAddress = "127.0.0.1:0" - c.Meta.Dir = MustTempFile() - c.Meta.BindAddress = "127.0.0.1:0" - c.Meta.HeartbeatTimeout = toml.Duration(50 * time.Millisecond) - c.Meta.ElectionTimeout = toml.Duration(50 * time.Millisecond) - c.Meta.LeaderLeaseTimeout = toml.Duration(50 * time.Millisecond) - c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond) - - c.Data.Dir = MustTempFile() - - c.HTTPD.Enabled = true - c.HTTPD.BindAddress = "127.0.0.1:0" - c.HTTPD.LogEnabled = testing.Verbose() - return c -} - -func newRetentionPolicyInfo(name string, rf int, duration time.Duration) *meta.RetentionPolicyInfo { - return &meta.RetentionPolicyInfo{Name: name, ReplicaN: rf, Duration: duration} -} - -func now() time.Time { - return time.Now().UTC() -} - -func yesterday() time.Time { - return now().Add(-1 * time.Hour * 24) -} - -func mustParseTime(layout, value string) time.Time { - tm, err := time.Parse(layout, value) - if err != nil { - panic(err) - } - return tm -} - -// MustReadAll reads r. Panic on error. -func MustReadAll(r io.Reader) []byte { - b, err := ioutil.ReadAll(r) - if err != nil { - panic(err) - } - return b -} - -// MustTempFile returns a path to a temporary file. -func MustTempFile() string { - f, err := ioutil.TempFile("", "influxd-") - if err != nil { - panic(err) - } - f.Close() - os.Remove(f.Name()) - return f.Name() -} - -func expectPattern(exp, act string) bool { - re := regexp.MustCompile(exp) - if !re.MatchString(act) { - return false - } - return true -} - -type Query struct { - name string - command string - params url.Values - exp, act string - pattern bool - skip bool -} - -// Execute runs the command and returns an err if it fails -func (q *Query) Execute(s *Server) (err error) { - if q.params == nil { - q.act, err = s.Query(q.command) - return - } - q.act, err = s.QueryWithParams(q.command, q.params) - return -} - -func (q *Query) success() bool { - if q.pattern { - return expectPattern(q.exp, q.act) - } - return q.exp == q.act -} - -func (q *Query) Error(err error) string { - return fmt.Sprintf("%s: %v", q.name, err) -} - -func (q *Query) failureMessage() string { - return fmt.Sprintf("%s: unexpected results for query: %s\nexp: %s\nactual: %s\n", q.name, q.command, q.exp, q.act) -} - -type Test struct { - initialized bool - write string - db string - rp string - exp string - queries []*Query -} - -func NewTest(db, rp string) Test { - return Test{ - db: db, - rp: rp, - } -} - -func (t *Test) addQueries(q ...*Query) { - t.queries = append(t.queries, q...) -} - -func (t *Test) init(s *Server) error { - if t.write == "" || t.initialized { - return nil - } - t.initialized = true - if res, err := s.Write(t.db, t.rp, t.write); err != nil { - return err - } else if t.exp != res { - return fmt.Errorf("unexpected results\nexp: %s\ngot: %s\n", t.exp, res) - } - return nil -}