Integration test delay.

pull/1935/head
Ben Johnson 2015-03-14 14:31:25 -06:00
commit 06d839223e
9 changed files with 217 additions and 94 deletions

View File

@ -1,6 +1,11 @@
## v0.9.0-rc12 [unreleased]
## v0.9.0-rc11 [2015-03-12]
### Bugfixes
- [#1942](https://github.com/influxdb/influxdb/pull/1942): Sort wildcard names.
- [#1957](https://github.com/influxdb/influxdb/pull/1957): Graphite numbers are always float64.
- [#1955](https://github.com/influxdb/influxdb/pull/1955): Prohibit creation of databases with no name. Thanks @dullgiulio
## v0.9.0-rc11 [2015-03-13]
### Bugfixes
- [#1917](https://github.com/influxdb/influxdb/pull/1902): Creating Infinite Retention Policy Failed.
@ -9,6 +14,7 @@
- [#1930](https://github.com/influxdb/influxdb/pull/1930): Auto create database for graphite if not specified.
- [#1908](https://github.com/influxdb/influxdb/pull/1908): Cosmetic CLI output fixes.
- [#1931](https://github.com/influxdb/influxdb/pull/1931): Add default column to SHOW RETENTION POLICIES.
- [#1937](https://github.com/influxdb/influxdb/pull/1937): OFFSET should be allowed to be 0.
### Features
- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duration.

View File

@ -867,43 +867,79 @@ func TestClientLibrary(t *testing.T) {
os.RemoveAll(dir)
}()
database := "mydb"
retentionPolicy := "myrp"
now := time.Now().UTC()
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290, nil)
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retentionPolicy)
type write struct {
bp client.BatchPoints
expected string
err string
}
type query struct {
query client.Query
expected string
err string
}
type test struct {
name string
db string
rp string
writes []write
queries []query
}
tests := []struct {
name string
bp client.BatchPoints
results client.Results
query client.Query
writeExpected, queryExpected string
writeErr, queryErr string
}{
tests := []test{
{
name: "empty batchpoint",
writeErr: "database is required",
writeExpected: `{"error":"database is required"}`,
name: "empty batchpoint",
writes: []write{
{
err: "database is required",
expected: `{"error":"database is required"}`,
},
},
},
{
name: "no points",
writeExpected: `null`,
bp: client.BatchPoints{Database: "mydb"},
name: "no points",
writes: []write{
{
expected: `null`,
bp: client.BatchPoints{Database: "mydb"},
},
},
},
{
name: "one point",
bp: client.BatchPoints{
Database: "mydb",
Points: []client.Point{
{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}, Timestamp: now},
writes: []write{
{
bp: client.BatchPoints{
Database: "mydb",
Points: []client.Point{
{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}, Timestamp: now},
},
},
expected: `null`,
},
},
queries: []query{
{
query: client.Query{Command: `select * from "mydb"."myrp".cpu`},
expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",1.1]]}]}]}`, now.Format(time.RFC3339Nano)),
},
},
},
{
name: "mulitple points, multiple values",
writes: []write{
{bp: client.BatchPoints{Database: "mydb", Points: []client.Point{{Name: "network", Fields: map[string]interface{}{"rx": 1.1, "tx": 2.1}, Timestamp: now}}}, expected: `null`},
{bp: client.BatchPoints{Database: "mydb", Points: []client.Point{{Name: "network", Fields: map[string]interface{}{"rx": 1.2, "tx": 2.2}, Timestamp: now.Add(time.Nanosecond)}}}, expected: `null`},
{bp: client.BatchPoints{Database: "mydb", Points: []client.Point{{Name: "network", Fields: map[string]interface{}{"rx": 1.3, "tx": 2.3}, Timestamp: now.Add(2 * time.Nanosecond)}}}, expected: `null`},
},
queries: []query{
{
query: client.Query{Command: `select * from "mydb"."myrp".network`},
expected: fmt.Sprintf(`{"results":[{"series":[{"name":"network","columns":["time","rx","tx"],"values":[["%s",1.1,2.1],["%s",1.2,2.2],["%s",1.3,2.3]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(time.Nanosecond).Format(time.RFC3339Nano), now.Add(2*time.Nanosecond).Format(time.RFC3339Nano)),
},
},
writeExpected: `null`,
query: client.Query{Command: `select * from "mydb"."myrp".cpu`},
queryExpected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",1.1]]}]}]}`, now.Format(time.RFC3339Nano)),
},
}
@ -913,32 +949,44 @@ func TestClientLibrary(t *testing.T) {
}
for _, test := range tests {
if test.db == "" {
test.db = "mydb"
}
if test.rp == "" {
test.rp = "myrp"
}
createDatabase(t, testName, nodes, test.db)
createRetentionPolicy(t, testName, nodes, test.db, test.rp)
t.Logf("testing %s - %s\n", testName, test.name)
writeResult, err := c.Write(test.bp)
if test.writeErr != errToString(err) {
t.Errorf("unexpected error. expected: %s, got %v", test.writeErr, err)
}
jsonResult := mustMarshalJSON(writeResult)
if test.writeExpected != jsonResult {
t.Logf("write expected result: %s\n", test.writeExpected)
t.Logf("write got result: %s\n", jsonResult)
t.Error("unexpected results")
}
if test.query.Command != "" {
time.Sleep(500 * time.Millisecond)
queryResult, err := c.Query(test.query)
if test.queryErr != errToString(err) {
t.Errorf("unexpected error. expected: %s, got %v", test.queryErr, err)
for _, w := range test.writes {
writeResult, err := c.Write(w.bp)
if w.err != errToString(err) {
t.Errorf("unexpected error. expected: %s, got %v", w.err, err)
}
jsonResult := mustMarshalJSON(queryResult)
if test.queryExpected != jsonResult {
t.Logf("query expected result: %s\n", test.queryExpected)
t.Logf("query got result: %s\n", jsonResult)
jsonResult := mustMarshalJSON(writeResult)
if w.expected != jsonResult {
t.Logf("write expected result: %s\n", w.expected)
t.Logf("write got result: %s\n", jsonResult)
t.Error("unexpected results")
}
}
for _, q := range test.queries {
if q.query.Command != "" {
time.Sleep(500 * time.Millisecond)
queryResult, err := c.Query(q.query)
if q.err != errToString(err) {
t.Errorf("unexpected error. expected: %s, got %v", q.err, err)
}
jsonResult := mustMarshalJSON(queryResult)
if q.expected != jsonResult {
t.Logf("query expected result: %s\n", q.expected)
t.Logf("query got result: %s\n", jsonResult)
t.Error("unexpected results")
}
}
}
deleteDatabase(t, testName, nodes, test.db)
}
}
@ -992,7 +1040,7 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) {
}
}
func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
if testing.Short() {
t.Skip()
}
@ -1004,7 +1052,58 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
c := main.NewConfig()
g := main.Graphite{
Enabled: true,
Database: "graphite",
Protocol: "TCP",
Port: 2103,
}
c.Graphites = append(c.Graphites, g)
t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)
createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}
t.Log("Writing data")
data := []byte(`cpu 0.000 `)
data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...)
data = append(data, '\n')
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",0]]}]}]}`, now.Format(time.RFC3339Nano))
// query and wait for results
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}
func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8590
testName := "graphite integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Millisecond)
c := main.NewConfig()
g := main.Graphite{
Enabled: true,
Port: 2203,
Protocol: "TCP",
}
c.Graphites = append(c.Graphites, g)

View File

@ -86,12 +86,7 @@ func (p *Parser) Parse(line string) (influxdb.Point, error) {
}
fieldValues := make(map[string]interface{})
// Determine if value is a float or an int.
if i := int64(v); float64(i) == v {
fieldValues[name] = int64(v)
} else {
fieldValues[name] = v
}
fieldValues[name] = v
// Parse timestamp.
unixTime, err := strconv.ParseInt(fields[2], 10, 64)

View File

@ -61,9 +61,7 @@ func Test_DecodeMetric(t *testing.T) {
line string
name string
tags map[string]string
isInt bool
iv int64
fv float64
value float64
timestamp time.Time
position, separator string
err string
@ -73,8 +71,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -83,8 +80,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -93,8 +89,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `foo.bar.cpu 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -103,8 +98,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu 50 ` + strTime,
name: "cpu",
tags: map[string]string{},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -113,8 +107,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu 50 ` + strTime,
name: "cpu",
tags: map[string]string{},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -122,8 +115,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -132,8 +124,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -142,8 +133,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu-foo-bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
@ -152,8 +142,7 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpuboofooboobar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
@ -162,16 +151,14 @@ func Test_DecodeMetric(t *testing.T) {
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
value: 50,
timestamp: testTime,
},
{
test: "metric only with float value",
line: `cpu 50.554 ` + strTime,
name: "cpu",
isInt: false,
fv: 50.554,
value: 50.554,
timestamp: testTime,
},
{
@ -224,16 +211,9 @@ func Test_DecodeMetric(t *testing.T) {
if len(point.Tags) != len(test.tags) {
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags))
}
if test.isInt {
i := point.Fields[point.Name].(int64)
if i != test.iv {
t.Fatalf("integerValue value mismatch. expected %v, got %v", test.iv, point.Fields[point.Name])
}
} else {
f := point.Fields[point.Name].(float64)
if point.Fields[point.Name] != f {
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.fv, f)
}
f := point.Fields[point.Name].(float64)
if point.Fields[point.Name] != f {
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.value, f)
}
if point.Timestamp.UnixNano()/1000000 != test.timestamp.UnixNano()/1000000 {
t.Fatalf("timestamp value mismatch. expected %v, got %v", test.timestamp.UnixNano(), point.Timestamp.UnixNano())

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -654,6 +655,8 @@ func (s *SelectStatement) RewriteWildcards(fields Fields, dimensions Dimensions)
for _, f := range s.Fields {
switch f.Expr.(type) {
case *Wildcard:
// Sort wildcard fields for consistent output
sort.Sort(fields)
rwFields = append(rwFields, fields...)
default:
rwFields = append(rwFields, f)
@ -1573,6 +1576,11 @@ func (f *Field) String() string {
return fmt.Sprintf("%s AS %s", f.Expr.String(), f.Alias)
}
// Sort Interface for Fields
func (f Fields) Len() int { return len(f) }
func (f Fields) Less(i, j int) bool { return f[i].Name() < f[j].Name() }
func (f Fields) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
// Dimensions represents a list of dimensions.
type Dimensions []*Dimension

View File

@ -1435,8 +1435,8 @@ func (p *Parser) parseOptionalTokenAndInt(t Token) (int, error) {
// Parse number.
n, _ := strconv.ParseInt(lit, 10, 64)
if n < 1 {
msg := fmt.Sprintf("%s must be > 0", t.String())
if n < 0 {
msg := fmt.Sprintf("%s must be >= 0", t.String())
return 0, &ParseError{Message: msg, Pos: pos}
}

View File

@ -244,6 +244,18 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: &influxql.ShowSeriesStatement{},
},
// SHOW SERIES with OFFSET 0
{
s: `SHOW SERIES OFFSET 0`,
stmt: &influxql.ShowSeriesStatement{Offset: 0},
},
// SHOW SERIES with LIMIT 2 OFFSET 0
{
s: `SHOW SERIES LIMIT 2 OFFSET 0`,
stmt: &influxql.ShowSeriesStatement{Offset: 0, Limit: 2},
},
// SHOW SERIES WHERE with ORDER BY and LIMIT
{
s: `SHOW SERIES WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
@ -766,10 +778,8 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 FROM myseries GROUP`, err: `found EOF, expected BY at line 1, char 35`},
{s: `SELECT field1 FROM myseries LIMIT`, err: `found EOF, expected number at line 1, char 35`},
{s: `SELECT field1 FROM myseries LIMIT 10.5`, err: `fractional parts not allowed in LIMIT at line 1, char 35`},
{s: `SELECT field1 FROM myseries LIMIT 0`, err: `LIMIT must be > 0 at line 1, char 35`},
{s: `SELECT field1 FROM myseries OFFSET`, err: `found EOF, expected number at line 1, char 36`},
{s: `SELECT field1 FROM myseries OFFSET 10.5`, err: `fractional parts not allowed in OFFSET at line 1, char 36`},
{s: `SELECT field1 FROM myseries OFFSET 0`, err: `OFFSET must be > 0 at line 1, char 36`},
{s: `SELECT field1 FROM myseries ORDER`, err: `found EOF, expected BY at line 1, char 35`},
{s: `SELECT field1 FROM myseries ORDER BY /`, err: `found /, expected identifier, ASC, or DESC at line 1, char 38`},
{s: `SELECT field1 FROM myseries ORDER BY 1`, err: `found 1, expected identifier, ASC, or DESC at line 1, char 38`},

View File

@ -751,6 +751,9 @@ func (s *Server) Databases() (a []string) {
// CreateDatabase creates a new database.
func (s *Server) CreateDatabase(name string) error {
if name == "" {
return ErrDatabaseNameRequired
}
c := &createDatabaseCommand{Name: name}
_, err := s.broadcast(createDatabaseMessageType, c)
return err
@ -799,6 +802,9 @@ func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) {
// DropDatabase deletes an existing database.
func (s *Server) DropDatabase(name string) error {
if name == "" {
return ErrDatabaseNameRequired
}
c := &dropDatabaseCommand{Name: name}
_, err := s.broadcast(dropDatabaseMessageType, c)
return err

View File

@ -308,6 +308,11 @@ func TestServer_CreateDatabase(t *testing.T) {
s := OpenServer(c)
defer s.Close()
// Attempt creating database without a name
if err := s.CreateDatabase(""); err != influxdb.ErrDatabaseNameRequired {
t.Fatal("expected error on empty database name")
}
// Create the "foo" database.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
@ -343,6 +348,11 @@ func TestServer_DropDatabase(t *testing.T) {
s := OpenServer(c)
defer s.Close()
// Attempt dropping a database without a name.
if err := s.DropDatabase(""); err != influxdb.ErrDatabaseNameRequired {
t.Fatal("expected error on empty database name")
}
// Create the "foo" database and verify it exists.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
@ -1664,6 +1674,14 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) {
t.Fatalf("unexpected row count: %d", len(res.Series))
}
// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 4 OFFSET 0`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
}
// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
@ -1794,7 +1812,8 @@ func TestServer_ExecuteWildcardQuery(t *testing.T) {
results := s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during SELECT *: %s", res.Err)
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","value","val-x"],"values":[["2000-01-01T00:00:00Z",10,null],["2000-01-01T00:00:10Z",null,20],["2000-01-01T00:00:20Z",30,40]]}]}` {
} else if s, e := mustMarshalJSON(res), `{"series":[{"name":"cpu","columns":["time","val-x","value"],"values":[["2000-01-01T00:00:00Z",null,10],["2000-01-01T00:00:10Z",20,null],["2000-01-01T00:00:20Z",40,30]]}]}`; s != e {
t.Logf("expected %s\n", e)
t.Fatalf("unexpected results during SELECT *: %s", s)
}
}