validate arguments for aggregate functions
parent
28d53b644f
commit
7de477889b
|
@ -918,6 +918,28 @@ func (s *SelectStatement) validate(tr targetRequirement) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
||||||
|
// First, determine if specific calls have at least one and only one argument
|
||||||
|
for _, f := range s.Fields {
|
||||||
|
if c, ok := f.Expr.(*Call); ok {
|
||||||
|
switch c.Name {
|
||||||
|
case "derivative", "non_negative_derivative":
|
||||||
|
if exp, got := 1, len(c.Args); got < exp {
|
||||||
|
return fmt.Errorf("invalid number of arguments for %s, expected at least %d, got %d", c.Name, exp, got)
|
||||||
|
}
|
||||||
|
case "percentile":
|
||||||
|
if exp, got := 2, len(c.Args); got != exp {
|
||||||
|
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", c.Name, exp, got)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if exp, got := 1, len(c.Args); got != exp {
|
||||||
|
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", c.Name, exp, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, check that we have valid duration and where clauses for aggregates
|
||||||
|
|
||||||
// fetch the group by duration
|
// fetch the group by duration
|
||||||
groupByDuration, _ := s.GroupByInterval()
|
groupByDuration, _ := s.GroupByInterval()
|
||||||
|
|
||||||
|
|
|
@ -13,9 +13,10 @@ func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob {
|
||||||
interval = ", " + interval
|
interval = ", " + interval
|
||||||
}
|
}
|
||||||
|
|
||||||
q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value)%s) FROM foo", fn, interval))
|
s := fmt.Sprintf("SELECT %s(mean(value)%s) FROM foo", fn, interval)
|
||||||
|
q, err := ParseQuery(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to parse query: %s", err)
|
t.Fatalf("failed to parse query %q: %s", s, err)
|
||||||
}
|
}
|
||||||
m := &MapReduceJob{
|
m := &MapReduceJob{
|
||||||
stmt: q.Statements[0].(*SelectStatement),
|
stmt: q.Statements[0].(*SelectStatement),
|
||||||
|
|
|
@ -709,12 +709,12 @@ func TestParser_ParseStatement(t *testing.T) {
|
||||||
|
|
||||||
// CREATE CONTINUOUS QUERY ... INTO <measurement>
|
// CREATE CONTINUOUS QUERY ... INTO <measurement>
|
||||||
{
|
{
|
||||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(5m) END`,
|
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count(field1) INTO measure1 FROM myseries GROUP BY time(5m) END`,
|
||||||
stmt: &influxql.CreateContinuousQueryStatement{
|
stmt: &influxql.CreateContinuousQueryStatement{
|
||||||
Name: "myquery",
|
Name: "myquery",
|
||||||
Database: "testdb",
|
Database: "testdb",
|
||||||
Source: &influxql.SelectStatement{
|
Source: &influxql.SelectStatement{
|
||||||
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count"}}},
|
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}},
|
||||||
Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1"}},
|
Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1"}},
|
||||||
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
|
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
|
||||||
Dimensions: []*influxql.Dimension{
|
Dimensions: []*influxql.Dimension{
|
||||||
|
@ -747,12 +747,12 @@ func TestParser_ParseStatement(t *testing.T) {
|
||||||
|
|
||||||
// CREATE CONTINUOUS QUERY ... INTO <retention-policy>.<measurement>
|
// CREATE CONTINUOUS QUERY ... INTO <retention-policy>.<measurement>
|
||||||
{
|
{
|
||||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO "1h.policy1"."cpu.load" FROM myseries GROUP BY time(5m) END`,
|
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count(field1) INTO "1h.policy1"."cpu.load" FROM myseries GROUP BY time(5m) END`,
|
||||||
stmt: &influxql.CreateContinuousQueryStatement{
|
stmt: &influxql.CreateContinuousQueryStatement{
|
||||||
Name: "myquery",
|
Name: "myquery",
|
||||||
Database: "testdb",
|
Database: "testdb",
|
||||||
Source: &influxql.SelectStatement{
|
Source: &influxql.SelectStatement{
|
||||||
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count"}}},
|
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}},
|
||||||
Target: &influxql.Target{
|
Target: &influxql.Target{
|
||||||
Measurement: &influxql.Measurement{RetentionPolicy: "1h.policy1", Name: "cpu.load"},
|
Measurement: &influxql.Measurement{RetentionPolicy: "1h.policy1", Name: "cpu.load"},
|
||||||
},
|
},
|
||||||
|
@ -1104,6 +1104,8 @@ func TestParser_ParseStatement(t *testing.T) {
|
||||||
{s: `SELECT distinct field1, field2 FROM myseries`, err: `aggregate function distinct() can not be combined with other functions or fields`},
|
{s: `SELECT distinct field1, field2 FROM myseries`, err: `aggregate function distinct() can not be combined with other functions or fields`},
|
||||||
{s: `SELECT count(distinct) FROM myseries`, err: `found ), expected (, identifier at line 1, char 22`},
|
{s: `SELECT count(distinct) FROM myseries`, err: `found ), expected (, identifier at line 1, char 22`},
|
||||||
{s: `SELECT count(distinct field1, field2) FROM myseries`, err: `count(distinct <field>) can only have one argument`},
|
{s: `SELECT count(distinct field1, field2) FROM myseries`, err: `count(distinct <field>) can only have one argument`},
|
||||||
|
{s: `select count(distinct(too, many, arguments))`, err: `found EOF, expected FROM at line 1, char 45`},
|
||||||
|
{s: `select count() from myseries`, err: `invalid number of arguments for count, expected 1, got 0`},
|
||||||
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
|
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
|
||||||
{s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`},
|
{s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`},
|
||||||
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
|
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
|
||||||
|
|
|
@ -1616,7 +1616,7 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
|
||||||
s.SetDefaultRetentionPolicy("foo", "bar")
|
s.SetDefaultRetentionPolicy("foo", "bar")
|
||||||
|
|
||||||
// create and check
|
// create and check
|
||||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count(*) INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error parsing query %s", err.Error())
|
t.Fatalf("error parsing query %s", err.Error())
|
||||||
|
@ -1660,7 +1660,7 @@ func TestServer_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create and check
|
// create and check
|
||||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count(*) INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error parsing query %s", err.Error())
|
t.Fatalf("error parsing query %s", err.Error())
|
||||||
|
@ -1695,7 +1695,7 @@ func TestServer_CreateContinuousQuery_ErrDatabaseNotFound(t *testing.T) {
|
||||||
// create and check
|
// create and check
|
||||||
nonExistentDBName := "bar"
|
nonExistentDBName := "bar"
|
||||||
q := fmt.Sprintf(
|
q := fmt.Sprintf(
|
||||||
"CREATE CONTINUOUS QUERY myquery ON %v BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END",
|
"CREATE CONTINUOUS QUERY myquery ON %v BEGIN SELECT count(*) INTO measure1 FROM myseries GROUP BY time(10m) END",
|
||||||
nonExistentDBName,
|
nonExistentDBName,
|
||||||
)
|
)
|
||||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||||
|
@ -1740,7 +1740,7 @@ func TestServer_CreateContinuousQuery_ErrRetentionPolicyNotFound(t *testing.T) {
|
||||||
retentionPolicy := "1h"
|
retentionPolicy := "1h"
|
||||||
// create and check
|
// create and check
|
||||||
q := fmt.Sprintf(
|
q := fmt.Sprintf(
|
||||||
"CREATE CONTINUOUS QUERY myquery ON %v BEGIN SELECT count() INTO \"%v.\".\"measure1\" FROM myseries GROUP BY time(10m) END",
|
"CREATE CONTINUOUS QUERY myquery ON %v BEGIN SELECT count(*) INTO \"%v.\".\"measure1\" FROM myseries GROUP BY time(10m) END",
|
||||||
database,
|
database,
|
||||||
retentionPolicy,
|
retentionPolicy,
|
||||||
)
|
)
|
||||||
|
@ -1776,7 +1776,7 @@ func TestServer_DropContinuousQuery(t *testing.T) {
|
||||||
s.SetDefaultRetentionPolicy("foo", "bar")
|
s.SetDefaultRetentionPolicy("foo", "bar")
|
||||||
|
|
||||||
// create and check
|
// create and check
|
||||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count(*) INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error parsing query %s", err.Error())
|
t.Fatalf("error parsing query %s", err.Error())
|
||||||
|
@ -1917,7 +1917,7 @@ func TestServer_ShowContinuousQueriesStatement(t *testing.T) {
|
||||||
s.SetDefaultRetentionPolicy("foo", "bar")
|
s.SetDefaultRetentionPolicy("foo", "bar")
|
||||||
|
|
||||||
// create and check
|
// create and check
|
||||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count(*) INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error parsing query %s", err.Error())
|
t.Fatalf("error parsing query %s", err.Error())
|
||||||
|
@ -1933,7 +1933,7 @@ func TestServer_ShowContinuousQueriesStatement(t *testing.T) {
|
||||||
if results.Error() != nil {
|
if results.Error() != nil {
|
||||||
t.Fatalf("unexpected error: %s", results.Error())
|
t.Fatalf("unexpected error: %s", results.Error())
|
||||||
}
|
}
|
||||||
expected := `{"series":[{"name":"foo","columns":["name","query"],"values":[["myquery","CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"]]}]}`
|
expected := `{"series":[{"name":"foo","columns":["name","query"],"values":[["myquery","CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count(*) INTO measure1 FROM myseries GROUP BY time(10m) END"]]}]}`
|
||||||
|
|
||||||
if res := results.Results[0]; res.Err != nil {
|
if res := results.Results[0]; res.Err != nil {
|
||||||
t.Errorf("unexpected error: %s", res.Err)
|
t.Errorf("unexpected error: %s", res.Err)
|
||||||
|
|
Loading…
Reference in New Issue