Merge pull request #2756 from influxdb/integration-tests

And even more Integration tests
pull/2763/head
Cory LaNou 2015-06-03 11:56:36 -06:00
commit f5d59eca3d
4 changed files with 616 additions and 179 deletions

View File

@ -56,6 +56,11 @@ func (s *Service) Open() error {
return nil
}
// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}
// serve accepts connections from the listener and handles them.
func (s *Service) serve() {
defer s.wg.Done()

View File

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
@ -33,7 +34,7 @@ func TestServer_DatabaseCommands(t *testing.T) {
exp: `{"results":[{}]}`,
},
&Query{
name: "create database should fail if it already exists",
name: "create database should error if it already exists",
command: `CREATE DATABASE db0`,
exp: `{"results":[{"error":"database already exists"}]}`,
},
@ -45,7 +46,7 @@ func TestServer_DatabaseCommands(t *testing.T) {
},
&Query{
skip: true,
name: "drop database should fail if it doesn't exist - FIXME pauldix",
name: "drop database should error if it doesn't exist - FIXME pauldix",
command: `DROP DATABASE db0`,
exp: `FIXME`,
},
@ -57,10 +58,10 @@ func TestServer_DatabaseCommands(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := s.Execute(query); err != nil {
t.Fatal(query.Error(err))
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Fatal(query.failureMessage())
t.Error(query.failureMessage())
}
}
}
@ -84,7 +85,7 @@ func TestServer_RetentionPolicyCommands(t *testing.T) {
exp: `{"results":[{}]}`,
},
&Query{
name: "create retention policy should fail if it already exists",
name: "create retention policy should error if it already exists",
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`,
exp: `{"results":[{"error":"retention policy already exists"}]}`,
},
@ -121,10 +122,10 @@ func TestServer_RetentionPolicyCommands(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := s.Execute(query); err != nil {
t.Fatal(query.Error(err))
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Fatal(query.failureMessage())
t.Error(query.failureMessage())
}
}
}
@ -254,6 +255,61 @@ func TestServer_Write_LineProtocol_Integer(t *testing.T) {
}
}
// Ensure the server can query with default databases (via param) and default retention policy
func TestServer_Query_DefaultDBAndRP(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
test := NewTest("db0", "rp0")
test.write = fmt.Sprintf(`cpu value=1.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano())
test.addQueries([]*Query{
&Query{
name: "default db and rp",
params: url.Values{"db": []string{"db0"}},
command: `SELECT * FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T01:00:00Z",1]]}]}]}`,
},
&Query{
skip: true,
name: "default rp - FIXME pauldix",
command: `SELECT * FROM db0..cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T01:00:00Z",1]]}]}]}`,
},
&Query{
name: "default dp",
params: url.Values{"db": []string{"db0"}},
command: `SELECT * FROM rp0.cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T01:00:00Z",1]]}]}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Ensure the server can query with the count aggregate function
func TestServer_Query_Count(t *testing.T) {
t.Parallel()
@ -265,17 +321,37 @@ func TestServer_Query_Count(t *testing.T) {
}
now := time.Now().UTC()
if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=1.0 `+strconv.FormatInt(now.UnixNano(), 10)); err != nil {
t.Fatal(err)
} else if exp := ``; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
test := NewTest("db0", "rp0")
test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10)
test.addQueries([]*Query{
&Query{
name: "selecting count(value) should succeed",
command: `SELECT count(value) FROM db0.rp0.cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
},
&Query{
name: "selecting count(*) should error",
command: `SELECT count(*) FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"expected field argument in count()"}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
// Verify the data was written.
if res, err := s.Query(`SELECT count(value) FROM db0.rp0.cpu`); err != nil {
t.Fatal(err)
} else if exp := `{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
@ -290,17 +366,38 @@ func TestServer_Query_Now(t *testing.T) {
}
now := time.Now().UTC()
if res, err := s.Write("db0", "rp0", `cpu,host=server01 value=1.0 `+strconv.FormatInt(now.UnixNano(), 10)); err != nil {
t.Fatal(err)
} else if exp := ``; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
test := NewTest("db0", "rp0")
test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10)
test.addQueries([]*Query{
&Query{
name: "where with time < now() should work",
command: `SELECT * FROM db0.rp0.cpu where time < now()`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)),
},
&Query{
skip: true,
name: "where with time > now() should return an empty result - FIXME pauldix",
command: `SELECT * FROM db0.rp0.cpu where time > now()`,
exp: `{"results":[{}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
// Verify the data was written.
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu where time < now()`); err != nil {
t.Fatal(err)
} else if exp := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)); exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
@ -314,55 +411,52 @@ func TestServer_Query_EpochPrecision(t *testing.T) {
t.Fatal(err)
}
now := now()
test := Test{
db: "db0",
rp: "rp0",
write: `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10),
queries: []*Query{
&Query{
name: "nanosecond precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"n"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()),
},
&Query{
name: "microsecond precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"u"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Microsecond)),
},
&Query{
name: "millisecond precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"ms"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Millisecond)),
},
&Query{
name: "second precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"s"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Second)),
},
&Query{
name: "minute precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"m"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Minute)),
},
&Query{
name: "hour precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"h"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Hour)),
},
},
}
now := time.Now().UTC()
if res, err := s.Write(test.db, test.rp, test.write); err != nil {
t.Fatal(err)
} else if test.exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", test.exp, res)
test := NewTest("db0", "rp0")
test.write = `cpu,host=server01 value=1.0 ` + strconv.FormatInt(now.UnixNano(), 10)
test.addQueries([]*Query{
&Query{
name: "nanosecond precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"n"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()),
},
&Query{
name: "microsecond precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"u"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Microsecond)),
},
&Query{
name: "millisecond precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"ms"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Millisecond)),
},
&Query{
name: "second precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"s"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Second)),
},
&Query{
name: "minute precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"m"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Minute)),
},
&Query{
name: "hour precision",
command: `SELECT * FROM db0.rp0.cpu`,
params: url.Values{"epoch": []string{"h"}},
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[%d,1]]}]}]}`, now.UnixNano()/int64(time.Hour)),
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
@ -370,10 +464,10 @@ func TestServer_Query_EpochPrecision(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := s.Execute(query); err != nil {
t.Fatal(query.Error(err))
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Fatal(query.failureMessage())
t.Error(query.failureMessage())
}
}
}
@ -388,39 +482,41 @@ func TestServer_Query_Tags(t *testing.T) {
t.Fatal(err)
}
now := now()
test := Test{
db: "db0",
rp: "rp0",
write: fmt.Sprintf("cpu,host=server01 value=100,core=4 %s\ncpu,host=server02 value=50,core=2 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10)),
queries: []*Query{
&Query{
name: "tag without field should return error",
command: `SELECT host FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"select statement must include at least one field or function call"}]}`,
},
&Query{
name: "field with tag should succeed",
command: `SELECT host, value FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",100]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["%s",50]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
&Query{
name: "field with two tags should succeed",
command: `SELECT host, value, core FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value","core"],"values":[["%s",100,4]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value","core"],"values":[["%s",50,2]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
&Query{
name: "select * with tags should succeed",
command: `SELECT * FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","core","value"],"values":[["%s",4,100]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","core","value"],"values":[["%s",2,50]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
},
}
now := time.Now().UTC()
if res, err := s.Write(test.db, test.rp, test.write); err != nil {
t.Fatal(err)
} else if test.exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", test.exp, res)
test := NewTest("db0", "rp0")
test.write = fmt.Sprintf("cpu,host=server01 value=100,core=4 %s\ncpu,host=server02 value=50,core=2 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10))
test.addQueries([]*Query{
&Query{
name: "tag without field should return error",
command: `SELECT host FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"select statement must include at least one field or function call"}]}`,
},
&Query{
name: "field with tag should succeed",
command: `SELECT host, value FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",100]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["%s",50]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
&Query{
name: "field with two tags should succeed",
command: `SELECT host, value, core FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value","core"],"values":[["%s",100,4]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value","core"],"values":[["%s",50,2]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
&Query{
name: "select * with tags should succeed",
command: `SELECT * FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","core","value"],"values":[["%s",4,100]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","core","value"],"values":[["%s",2,50]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
&Query{
name: "group by tag",
command: `SELECT value FROM db0.rp0.cpu GROUP by host`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",100]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["%s",50]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
@ -428,10 +524,10 @@ func TestServer_Query_Tags(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := s.Execute(query); err != nil {
t.Fatal(query.Error(err))
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Fatal(query.failureMessage())
t.Error(query.failureMessage())
}
}
}
@ -446,41 +542,38 @@ func TestServer_Query_Common(t *testing.T) {
t.Fatal(err)
}
now := now()
test := Test{
db: "db0",
rp: "rp0",
write: fmt.Sprintf("cpu,host=server01 value=1 %s", strconv.FormatInt(now.UnixNano(), 10)),
queries: []*Query{
&Query{
name: "selecting a valid measurement and field should succeed",
command: `SELECT value FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)),
},
&Query{
name: "selecting a measurement that doesn't exist should fail",
command: `SELECT value FROM db0.rp0.idontexist`,
exp: `.*measurement not found*`,
pattern: true,
},
&Query{
name: "selecting a field that doesn't exist should fail",
command: `SELECT idontexist FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"unknown field or tag name in select clause: idontexist"}]}`,
},
&Query{
skip: true,
name: "no results should return an empty result - FIXME pauldix",
command: `SELECT value FROM db0.rp0.cpu where time > now()`,
exp: `{"results":[{}]}`,
},
},
}
now := time.Now().UTC()
if res, err := s.Write(test.db, test.rp, test.write); err != nil {
t.Fatal(err)
} else if test.exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", test.exp, res)
test := NewTest("db0", "rp0")
test.write = fmt.Sprintf("cpu,host=server01 value=1 %s", strconv.FormatInt(now.UnixNano(), 10))
test.addQueries([]*Query{
&Query{
name: "selecting a valid measurement and field should succeed",
command: `SELECT value FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)),
},
&Query{
name: "selecting a measurement that doesn't exist should error",
command: `SELECT value FROM db0.rp0.idontexist`,
exp: `.*measurement not found*`,
pattern: true,
},
&Query{
name: "selecting a field that doesn't exist should error",
command: `SELECT idontexist FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"unknown field or tag name in select clause: idontexist"}]}`,
},
&Query{
skip: true,
name: "no results should return an empty result - FIXME pauldix",
command: `SELECT value FROM db0.rp0.cpu where time > now()`,
exp: `{"results":[{}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
@ -488,13 +581,12 @@ func TestServer_Query_Common(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := s.Execute(query); err != nil {
t.Fatal(query.Error(err))
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Fatal(query.failureMessage())
t.Error(query.failureMessage())
}
}
}
// Ensure the server can query two points.
@ -503,25 +595,302 @@ func TestServer_Query_SelectTwoPoints(t *testing.T) {
s := OpenServer(NewConfig(), "")
defer s.Close()
// Create the database.
if _, err := s.MetaStore.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.MetaStore.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}
now := time.Now().UTC()
if res, err := s.Write("db0", "rp0", fmt.Sprintf("cpu value=100 %s\ncpu value=200 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10))); err != nil {
now := now()
test := NewTest("db0", "rp0")
test.write = fmt.Sprintf("cpu value=100 %s\ncpu value=200 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10))
test.addQueries(&Query{
name: "selecting two points should result in two points",
command: `SELECT * FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100],["%s",200]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
})
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Ensure the server can query two negative points.
func TestServer_Query_SelectTwoNegativePoints(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
} else if exp := ``; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
}
// Verify the data was written.
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu`); err != nil {
now := now()
test := NewTest("db0", "rp0")
test.write = fmt.Sprintf("cpu value=-100 %s\ncpu value=-200 %s", strconv.FormatInt(now.UnixNano(), 10), strconv.FormatInt(now.Add(1).UnixNano(), 10))
test.addQueries(&Query{
name: "selecting two negative points should succeed",
command: `SELECT * FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",-100],["%s",-200]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
})
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Ensure the server can query with relative time.
func TestServer_Query_SelectRelativeTime(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
} else if exp := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100],["%s",200]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)); exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
}
now := now()
yesterday := yesterday()
test := NewTest("db0", "rp0")
test.write = fmt.Sprintf("cpu,host=server01 value=100 %s\ncpu,host=server01 value=200 %s", strconv.FormatInt(yesterday.UnixNano(), 10), strconv.FormatInt(now.UnixNano(), 10))
test.addQueries([]*Query{
&Query{
name: "single point with time pre-calculated for past time queries yesterday",
command: `SELECT * FROM db0.rp0.cpu where time >= '` + yesterday.Add(-1*time.Minute).Format(time.RFC3339Nano) + `'`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",100],["%s",200]]}]}]}`, yesterday.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano)),
},
&Query{
name: "single point with time pre-calculated for relative time queries now",
command: `SELECT * FROM db0.rp0.cpu where time >= now() - 1m`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",200]]}]}]}`, now.Format(time.RFC3339Nano)),
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// mergeMany ensures that when merging many series together and some of them have a different number
// of points than others in a group by interval the results are correct
func TestServer_Query_MergeMany(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
// set infinite retention policy as we are inserting data in the past and don't want retention policy enforcement to make this test racy
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
test := NewTest("db0", "rp0")
writes := []string{}
for i := 1; i < 11; i++ {
for j := 1; j < 5+i%3; j++ {
data := fmt.Sprintf(`cpu,host=server_%d value=22 %d`, i, time.Unix(int64(j), int64(0)).UTC().UnixNano())
writes = append(writes, data)
}
}
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "GROUP by time",
command: `SELECT count(value) FROM db0.rp0.cpu WHERE time >= '1970-01-01T00:00:01Z' AND time <= '1970-01-01T00:00:06Z' GROUP BY time(1s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:01Z",10],["1970-01-01T00:00:02Z",10],["1970-01-01T00:00:03Z",10],["1970-01-01T00:00:04Z",10],["1970-01-01T00:00:05Z",7],["1970-01-01T00:00:06Z",3]]}]}]}`,
},
&Query{
skip: true,
name: "GROUP by tag - FIXME pauldix",
command: `SELECT count(value) FROM db0.rp0.cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:00Z' group by host`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","count"],"values":[["2000-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","count"],"values":[["2000-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server03"},"columns":["time","count"],"values":[["2000-01-01T00:00:00Z",1]]}]}]}`,
},
&Query{
name: "GROUP by field",
command: `SELECT count(value) FROM db0.rp0.cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:00Z' group by value`,
exp: `{"results":[{"error":"can not use field in group by clause: value"}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_LimitAndOffset(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
// set infinite retention policy as we are inserting data in the past and don't want retention policy enforcement to make this test racy
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
test := NewTest("db0", "rp0")
writes := []string{}
for i := 1; i < 10; i++ {
data := fmt.Sprintf(`cpu,region=us-east,host=server-%d value=%d %d`, i, i, time.Unix(int64(i), int64(0)).UnixNano())
writes = append(writes, data)
}
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "SLIMIT 2 SOFFSET 1",
command: `SELECT count(value) FROM db0.rp0.cpu GROUP BY * SLIMIT 2 SOFFSET 1`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server-2","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server-3","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
},
&Query{
name: "SLIMIT 2 SOFFSET 3",
command: `SELECT count(value) FROM db0.rp0.cpu GROUP BY * SLIMIT 2 SOFFSET 3`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server-4","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server-5","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
},
&Query{
name: "SLIMIT 3 SOFFSET 8",
command: `SELECT count(value) FROM db0.rp0.cpu GROUP BY * SLIMIT 3 SOFFSET 8`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server-9","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_Regex(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`cpu1,host=server01 value=10 %d`, mustParseTime(time.RFC3339Nano, "2015-02-28T01:03:36.703820946Z").UnixNano()),
fmt.Sprintf(`cpu2,host=server01 value=20 %d`, mustParseTime(time.RFC3339Nano, "2015-02-28T01:03:36.703820946Z").UnixNano()),
fmt.Sprintf(`cpu3,host=server01 value=30 %d`, mustParseTime(time.RFC3339Nano, "2015-02-28T01:03:36.703820946Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "default db and rp",
command: `SELECT * FROM /cpu[13]/`,
params: url.Values{"db": []string{"db0"}},
exp: `{"results":[{"series":[{"name":"cpu1","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",10]]},{"name":"cpu3","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",30]]}]}]}`,
},
&Query{
name: "specifying db and rp",
command: `SELECT * FROM db0.rp0./cpu[13]/`,
exp: `{"results":[{"series":[{"name":"cpu1","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",10]]},{"name":"cpu3","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",30]]}]}]}`,
},
&Query{
name: "default db and specified rp",
command: `SELECT * FROM rp0./cpu[13]/`,
params: url.Values{"db": []string{"db0"}},
exp: `{"results":[{"series":[{"name":"cpu1","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",10]]},{"name":"cpu3","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",30]]}]}]}`,
},
&Query{
skip: true,
name: "specified db and default rp - FIXME pauldix",
command: `SELECT * FROM db0../cpu[13]/`,
exp: `{"results":[{"series":[{"name":"cpu1","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",10]]},{"name":"cpu3","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",30]]}]}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
@ -533,10 +902,27 @@ type Server struct {
// NewServer returns a new instance of Server.
func NewServer(c *run.Config, joinURLs string) *Server {
return &Server{
s := Server{
Server: run.NewServer(c, joinURLs),
Config: c,
}
// Set the logger to discard unless verbose is on
if !testing.Verbose() {
type logSetter interface {
SetLogger(*log.Logger)
}
nullLogger := log.New(ioutil.Discard, "", 0)
s.MetaStore.Logger = nullLogger
s.TSDBStore.Logger = nullLogger
for _, service := range s.Services {
if service, ok := service.(logSetter); ok {
service.SetLogger(nullLogger)
}
}
}
return &s
}
// OpenServer opens a test server.
@ -545,6 +931,7 @@ func OpenServer(c *run.Config, joinURLs string) *Server {
if err := s.Open(); err != nil {
panic(err.Error())
}
return s
}
@ -575,16 +962,6 @@ func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionP
return nil
}
// Execute takes a Query and executes it
func (s *Server) Execute(q *Query) (err error) {
if q.params == nil {
q.act, err = s.Query(q.command)
return
}
q.act, err = s.QueryWithParams(q.command, q.params)
return
}
// Query executes a query against the server and returns the results.
func (s *Server) Query(query string) (results string, err error) {
return s.QueryWithParams(query, nil)
@ -632,6 +1009,7 @@ func NewConfig() *run.Config {
c.HTTPD.Enabled = true
c.HTTPD.BindAddress = "127.0.0.1:0"
c.HTTPD.LogEnabled = testing.Verbose()
return c
}
@ -643,6 +1021,18 @@ func now() time.Time {
return time.Now().UTC()
}
func yesterday() time.Time {
return now().Add(-1 * time.Hour * 24)
}
func mustParseTime(layout, value string) time.Time {
tm, err := time.Parse(layout, value)
if err != nil {
panic(err)
}
return tm
}
// MustReadAll reads r. Panic on error.
func MustReadAll(r io.Reader) []byte {
b, err := ioutil.ReadAll(r)
@ -680,6 +1070,16 @@ type Query struct {
skip bool
}
// Execute runs the command and returns an err if it fails
func (q *Query) Execute(s *Server) (err error) {
if q.params == nil {
q.act, err = s.Query(q.command)
return
}
q.act, err = s.QueryWithParams(q.command, q.params)
return
}
func (q *Query) success() bool {
if q.pattern {
return expectPattern(q.exp, q.act)
@ -696,9 +1096,34 @@ func (q *Query) failureMessage() string {
}
type Test struct {
write string
db string
rp string
exp string
queries []*Query
initialized bool
write string
db string
rp string
exp string
queries []*Query
}
func NewTest(db, rp string) Test {
return Test{
db: db,
rp: rp,
}
}
func (t *Test) addQueries(q ...*Query) {
t.queries = append(t.queries, q...)
}
func (t *Test) init(s *Server) error {
if t.write == "" || t.initialized {
return nil
}
t.initialized = true
if res, err := s.Write(t.db, t.rp, t.write); err != nil {
return err
} else if t.exp != res {
return fmt.Errorf("unexpected results\nexp: %s\ngot: %s\n", t.exp, res)
}
return nil
}

View File

@ -60,6 +60,11 @@ func (s *Service) Close() error {
return nil
}
// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}
// Err returns a channel for fatal errors that occur on the listener.
func (s *Service) Err() <-chan error { return s.err }

View File

@ -188,6 +188,8 @@ func (s *Store) Open() error {
}
func (s *Store) WriteToShard(shardID uint64, points []Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
sh, ok := s.shards[shardID]
if !ok {
return ErrShardNotFound