Merge pull request #3930 from influxdb/top

Wire up Top aggregate function
pull/3996/head
Cory LaNou 2015-09-04 13:57:07 -05:00
commit 1af8dd81af
13 changed files with 1441 additions and 230 deletions

3
.gitignore vendored
View File

@ -69,3 +69,6 @@ integration/migration_data/
# goconvey config files
*.goconvey
// Ingnore SourceGraph directory
.srclib-store/

View File

@ -11,6 +11,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc
- [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821)
### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.

View File

@ -333,6 +333,7 @@ func configureLogging(s *Server) {
s.MetaStore.Logger = nullLogger
s.TSDBStore.Logger = nullLogger
s.HintedHandoff.SetLogger(nullLogger)
s.Monitor.SetLogger(nullLogger)
for _, service := range s.Services {
if service, ok := service.(logSetter); ok {
service.SetLogger(nullLogger)

View File

@ -993,7 +993,7 @@ func TestServer_Query_Count(t *testing.T) {
&Query{
name: "selecting count(*) should error",
command: `SELECT count(*) FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"expected field argument in count()"}]}`,
exp: `{"error":"error parsing query: expected field argument in count()"}`,
},
}...)
@ -2229,6 +2229,193 @@ func TestServer_Query_Aggregates(t *testing.T) {
}
}
func TestServer_Query_AggregatesTopInt(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
writes := []string{
// cpu data with overlapping duplicate values
// hour 0
fmt.Sprintf(`cpu,host=server01 value=2.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 value=3.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server03 value=4.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
// hour 1
fmt.Sprintf(`cpu,host=server04 value=5.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server05 value=7.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server06 value=6.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:20Z").UnixNano()),
// hour 2
fmt.Sprintf(`cpu,host=server07 value=7.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server08 value=9.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:10Z").UnixNano()),
// memory data
// hour 0
fmt.Sprintf(`memory,host=a,service=redis value=1000i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=mysql value=2000i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=redis value=1500i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
// hour 1
fmt.Sprintf(`memory,host=a,service=redis value=1001i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=mysql value=2001i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=redis value=1501i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
// hour 2
fmt.Sprintf(`memory,host=a,service=redis value=1002i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=mysql value=2002i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=redis value=1502i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "top - cpu",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 1) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 2 values",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 3 values - sorts on tie properly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - with tag",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top","host"],"values":[["2000-01-01T01:00:10Z",7,"server05"],["2000-01-01T02:00:10Z",9,"server08"]]}]}]}`,
},
&Query{
name: "top - cpu - 3 values with limit 2",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3) FROM cpu limit 2`,
exp: `{"error":"error parsing query: limit (3) in top function can not be larger than the LIMIT (2) in the select statement"}`,
},
&Query{
name: "top - cpu - hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 1) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T01:00:00Z",7],["2000-01-01T02:00:00Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - time specified - hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT time, TOP(value, 1) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - time specified (not first) - hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 1), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 2 values hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T00:00:00Z",3],["2000-01-01T01:00:00Z",7],["2000-01-01T01:00:00Z",6],["2000-01-01T02:00:00Z",9],["2000-01-01T02:00:00Z",7]]}]}]}`,
},
&Query{
name: "top - cpu - time specified - 2 values hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:10Z",3],["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T01:00:20Z",6],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 3 values hourly - validates that a bucket can have less than limit if no values exist in that time bucket",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:00Z",2],["2000-01-01T01:00:00Z",7],["2000-01-01T01:00:00Z",6],["2000-01-01T01:00:00Z",5],["2000-01-01T02:00:00Z",9],["2000-01-01T02:00:00Z",7]]}]}]}`,
},
&Query{
name: "top - cpu - time specified - 3 values hourly - validates that a bucket can have less than limit if no values exist in that time bucket",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",2],["2000-01-01T00:00:10Z",3],["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:00Z",5],["2000-01-01T01:00:10Z",7],["2000-01-01T01:00:20Z",6],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - memory - 2 values, two tags",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2), host, service FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T01:00:00Z",2001,"b","mysql"],["2000-01-01T02:00:00Z",2002,"b","mysql"]]}]}]}`,
},
&Query{
name: "top - memory - host tag with limit 2",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2) FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host"],"values":[["2000-01-01T02:00:00Z",2002,"b"],["2000-01-01T02:00:00Z",1002,"a"]]}]}]}`,
},
&Query{
name: "top - memory - host tag with limit 2, service tag in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2), service FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`,
},
&Query{
name: "top - memory - service tag with limit 2, host tag in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, service, 2), host FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","service","host"],"values":[["2000-01-01T02:00:00Z",2002,"mysql","b"],["2000-01-01T02:00:00Z",1502,"redis","b"]]}]}]}`,
},
&Query{
name: "top - memory - host and service tag with limit 2",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, service, 2) FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1502,"b","redis"]]}]}]}`,
},
&Query{
name: "top - memory - host tag with limit 2 with service tag in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2), service FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`,
},
&Query{
name: "top - memory - host and service tag with limit 3",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, service, 3) FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1502,"b","redis"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`,
},
// TODO
// - Test that specifiying fields or tags in the function will rewrite the query to expand them to the fields
// - Test that a field can be used in the top function
// - Test that asking for a field will come back before a tag if they have the same name for a tag and a field
// - Test that `select top(value, host, 2)` when there is only one value for `host` it will only bring back one value
// - Test that `select top(value, host, 4) from foo where time > now() - 1d and time < now() group by time(1h)` and host is unique in some time buckets that it returns only the unique ones, and not always 4 values
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Test various aggregates when different series only have data for the same timestamp.
func TestServer_Query_AggregatesIdenticalTime(t *testing.T) {
t.Parallel()

View File

@ -9,6 +9,8 @@ import (
"strconv"
"strings"
"time"
"github.com/influxdb/influxdb/pkg/slices"
)
// DataType represents the primitive data types available in InfluxQL.
@ -857,6 +859,48 @@ func (s *SelectStatement) RewriteDistinct() {
}
}
// ColumnNames will walk all fields and functions and return the appropriate field names for the select statement
// while maintaining order of the field names
func (s *SelectStatement) ColumnNames() []string {
// Always set the first column to be time, even if they didn't specify it
columnNames := []string{"time"}
// First walk each field
for _, field := range s.Fields {
switch f := field.Expr.(type) {
case *Call:
if f.Name == "top" || f.Name == "bottom" {
if len(f.Args) == 2 {
columnNames = append(columnNames, f.Name)
continue
}
// We have a special case now where we have to add the column names for the fields TOP or BOTTOM asked for as well
columnNames = slices.Union(columnNames, f.Fields(), true)
continue
}
columnNames = append(columnNames, field.Name())
default:
// time is always first, and we already added it, so ignore it if they asked for it anywhere else.
if field.Name() != "time" {
columnNames = append(columnNames, field.Name())
}
}
}
return columnNames
}
// HasTimeFieldSpecified will walk all fields and determine if the user explicitly asked for time
// This is needed to determine re-write behaviors for functions like TOP and BOTTOM
func (s *SelectStatement) HasTimeFieldSpecified() bool {
for _, f := range s.Fields {
if f.Name() == "time" {
return true
}
}
return false
}
// String returns a string representation of the select statement.
func (s *SelectStatement) String() string {
var buf bytes.Buffer
@ -1029,40 +1073,96 @@ func (s *SelectStatement) validateFields() error {
return nil
}
// validSelectWithAggregate determines if a SELECT statement has the correct
// combination of aggregate functions combined with selected fields and tags
// Currently we don't have support for all aggregates, but aggregates that
// can be combined with fields/tags are:
// TOP, BOTTOM, MAX, MIN, FIRST, LAST
func (s *SelectStatement) validSelectWithAggregate(numAggregates int) error {
if numAggregates != 0 && numAggregates != len(s.Fields) {
return fmt.Errorf("mixing aggregate and non-aggregate queries is not supported")
}
return nil
}
func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
// First, if 1 field is an aggregate, then all fields must be an aggregate. This is
// a explicit limitation of the current system.
// Curently most aggregates can be the ONLY thing in a select statement
// Others, like TOP/BOTTOM can mix aggregates and tags/fields
numAggregates := 0
for _, f := range s.Fields {
if _, ok := f.Expr.(*Call); ok {
numAggregates++
}
}
if numAggregates != 0 && numAggregates != len(s.Fields) {
return fmt.Errorf("mixing aggregate and non-aggregate queries is not supported")
}
// Secondly, determine if specific calls have at least one and only one argument
for _, f := range s.Fields {
if c, ok := f.Expr.(*Call); ok {
switch c.Name {
switch expr := f.Expr.(type) {
case *Call:
switch expr.Name {
case "derivative", "non_negative_derivative":
if min, max, got := 1, 2, len(c.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", c.Name, min, max, got)
if err := s.validSelectWithAggregate(numAggregates); err != nil {
return err
}
if min, max, got := 1, 2, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
}
case "percentile":
if exp, got := 2, len(c.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", c.Name, exp, got)
if err := s.validSelectWithAggregate(numAggregates); err != nil {
return err
}
if exp, got := 2, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}
_, ok := expr.Args[1].(*NumberLiteral)
if !ok {
return fmt.Errorf("expected float argument in percentile()")
}
case "top", "bottom":
if exp, got := 2, len(expr.Args); got < exp {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d, got %d", expr.Name, exp, got)
}
if len(expr.Args) > 1 {
callLimit, ok := expr.Args[len(expr.Args)-1].(*NumberLiteral)
if !ok {
return fmt.Errorf("expected integer as last argument in %s(), found %s", expr.Name, expr.Args[len(expr.Args)-1])
}
// Check if they asked for a limit smaller than what they passed into the call
if int64(callLimit.Val) > int64(s.Limit) && s.Limit != 0 {
return fmt.Errorf("limit (%d) in %s function can not be larger than the LIMIT (%d) in the select statement", int64(callLimit.Val), expr.Name, int64(s.Limit))
}
for _, v := range expr.Args[:len(expr.Args)-1] {
if _, ok := v.(*VarRef); !ok {
return fmt.Errorf("only fields or tags are allowed in %s(), found %s", expr.Name, v)
}
}
}
default:
if exp, got := 1, len(c.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", c.Name, exp, got)
if err := s.validSelectWithAggregate(numAggregates); err != nil {
return err
}
if exp, got := 1, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}
switch fc := expr.Args[0].(type) {
case *VarRef:
// do nothing
case *Call:
if fc.Name != "distinct" {
return fmt.Errorf("expected field argument in %s()", expr.Name)
}
case *Distinct:
if expr.Name != "count" {
return fmt.Errorf("expected field argument in %s()", expr.Name)
}
default:
return fmt.Errorf("expected field argument in %s()", expr.Name)
}
}
}
}
// Now, check that we have valid duration and where clauses for aggregates
// Check that we have valid duration and where clauses for aggregates
// fetch the group by duration
groupByDuration, _ := s.GroupByInterval()
@ -2241,6 +2341,33 @@ func (c *Call) String() string {
return fmt.Sprintf("%s(%s)", c.Name, strings.Join(str, ", "))
}
// Fields will extract any field names from the call. Only specific calls support this.
func (c *Call) Fields() []string {
switch c.Name {
case "top", "bottom":
// maintain the order the user specified in the query
keyMap := make(map[string]struct{})
keys := []string{}
for i, a := range c.Args {
if i == 0 {
// special case, first argument is always the name of the function regardless of the field name
keys = append(keys, c.Name)
continue
}
switch v := a.(type) {
case *VarRef:
if _, ok := keyMap[v.Val]; !ok {
keyMap[v.Val] = struct{}{}
keys = append(keys, v.Val)
}
}
}
return keys
default:
return []string{}
}
}
// Distinct represents a DISTINCT expression.
type Distinct struct {
// Identifier following DISTINCT

View File

@ -451,7 +451,7 @@ func TestSelectStatement_IsRawQuerySet(t *testing.T) {
isRaw: false,
},
{
stmt: "select mean(*) from foo group by *",
stmt: "select mean(value) from foo group by *",
isRaw: false,
},
}

View File

@ -12,13 +12,14 @@ import (
"math"
"math/rand"
"sort"
"strings"
)
// Iterator represents a forward-only iterator over a set of points.
// These are used by the MapFunctions in this file
type Iterator interface {
Next() (time int64, value interface{})
Tags() map[string]string
TMin() int64
}
// MapFunc represents a function used for mapping over a sequential series of data.
@ -39,39 +40,6 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
return MapRawQuery, nil
}
// Ensure that there is either a single argument or if for percentile, two
if c.Name == "percentile" {
if len(c.Args) != 2 {
return nil, fmt.Errorf("expected two arguments for %s()", c.Name)
}
} else if strings.HasSuffix(c.Name, "derivative") {
// derivatives require a field name and optional duration
if len(c.Args) == 0 {
return nil, fmt.Errorf("expected field name argument for %s()", c.Name)
}
} else if len(c.Args) != 1 {
return nil, fmt.Errorf("expected one argument for %s()", c.Name)
}
// derivative can take a nested aggregate function, everything else expects
// a variable reference as the first arg
if !strings.HasSuffix(c.Name, "derivative") {
// Ensure the argument is appropriate for the aggregate function.
switch fc := c.Args[0].(type) {
case *VarRef:
case *Distinct:
if c.Name != "count" {
return nil, fmt.Errorf("expected field argument in %s()", c.Name)
}
case *Call:
if fc.Name != "distinct" {
return nil, fmt.Errorf("expected field argument in %s()", c.Name)
}
default:
return nil, fmt.Errorf("expected field argument in %s()", c.Name)
}
}
// Retrieve map function by name.
switch c.Name {
case "count":
@ -104,11 +72,11 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
return MapFirst, nil
case "last":
return MapLast, nil
case "top":
return func(itr Iterator) interface{} {
return MapTop(itr, c)
}, nil
case "percentile":
_, ok := c.Args[1].(*NumberLiteral)
if !ok {
return nil, fmt.Errorf("expected float argument in percentile()")
}
return MapEcho, nil
case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then
@ -156,16 +124,14 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
return ReduceFirst, nil
case "last":
return ReduceLast, nil
case "top":
return func(values []interface{}) interface{} {
return ReduceTop(values, c)
}, nil
case "percentile":
if len(c.Args) != 2 {
return nil, fmt.Errorf("expected float argument in percentile()")
}
lit, ok := c.Args[1].(*NumberLiteral)
if !ok {
return nil, fmt.Errorf("expected float argument in percentile()")
}
return ReducePercentile(lit.Val), nil
return func(values []interface{}) interface{} {
return ReducePercentile(values, c)
}, nil
case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate
@ -204,7 +170,7 @@ func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) {
}, nil
case "distinct":
return func(b []byte) (interface{}, error) {
var val distinctValues
var val interfaceValues
err := json.Unmarshal(b, &val)
return val, err
}, nil
@ -253,12 +219,14 @@ func MapCount(itr Iterator) interface{} {
return nil
}
type distinctValues []interface{}
type interfaceValues []interface{}
func (d distinctValues) Len() int { return len(d) }
func (d distinctValues) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d distinctValues) Less(i, j int) bool {
func (d interfaceValues) Len() int { return len(d) }
func (d interfaceValues) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d interfaceValues) Less(i, j int) bool {
// Sort by type if types match
// Sort by float64/int64 first as that is the most likely match
{
d1, ok1 := d[i].(float64)
d2, ok2 := d[j].(float64)
@ -267,6 +235,23 @@ func (d distinctValues) Less(i, j int) bool {
}
}
{
d1, ok1 := d[i].(int64)
d2, ok2 := d[j].(int64)
if ok1 && ok2 {
return d1 < d2
}
}
// Sort by every numeric type left
{
d1, ok1 := d[i].(float32)
d2, ok2 := d[j].(float32)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(uint64)
d2, ok2 := d[j].(uint64)
@ -275,6 +260,54 @@ func (d distinctValues) Less(i, j int) bool {
}
}
{
d1, ok1 := d[i].(uint32)
d2, ok2 := d[j].(uint32)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(uint16)
d2, ok2 := d[j].(uint16)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(uint8)
d2, ok2 := d[j].(uint8)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(int32)
d2, ok2 := d[j].(int32)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(int16)
d2, ok2 := d[j].(int16)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(int8)
d2, ok2 := d[j].(int8)
if ok1 && ok2 {
return d1 < d2
}
}
{
d1, ok1 := d[i].(bool)
d2, ok2 := d[j].(bool)
@ -303,16 +336,30 @@ func (d distinctValues) Less(i, j int) bool {
switch v := val.(type) {
case uint64:
return intWeight, float64(v)
case uint32:
return intWeight, float64(v)
case uint16:
return intWeight, float64(v)
case uint8:
return intWeight, float64(v)
case int64:
return intWeight, float64(v)
case int32:
return intWeight, float64(v)
case int16:
return intWeight, float64(v)
case int8:
return intWeight, float64(v)
case float64:
return floatWeight, v
return floatWeight, float64(v)
case float32:
return floatWeight, float64(v)
case bool:
return boolWeight, 0
case string:
return stringWeight, 0
}
panic("unreachable code")
panic("interfaceValues.Less - unreachable code")
}
w1, n1 := infer(d[i])
@ -338,7 +385,7 @@ func MapDistinct(itr Iterator) interface{} {
return nil
}
results := make(distinctValues, len(index))
results := make(interfaceValues, len(index))
var i int
for value, _ := range index {
results[i] = value
@ -356,7 +403,7 @@ func ReduceDistinct(values []interface{}) interface{} {
if v == nil {
continue
}
d, ok := v.(distinctValues)
d, ok := v.(interfaceValues)
if !ok {
msg := fmt.Sprintf("expected distinctValues, got: %T", v)
panic(msg)
@ -367,7 +414,7 @@ func ReduceDistinct(values []interface{}) interface{} {
}
// convert map keys to an array
results := make(distinctValues, len(index))
results := make(interfaceValues, len(index))
var i int
for k, _ := range index {
results[i] = k
@ -1035,6 +1082,414 @@ func ReduceLast(values []interface{}) interface{} {
return nil
}
type positionOut struct {
points PositionPoints
callArgs []string // ordered args in the call
}
func (p *positionOut) lessKey(i, j int) bool {
t1, t2 := p.points[i].Tags, p.points[j].Tags
for _, k := range p.callArgs {
if t1[k] != t2[k] {
return t1[k] < t2[k]
}
}
return false
}
func (p *positionOut) less(i, j int, sortFloat func(d1, d2 float64) bool, sortInt64 func(d1, d2 int64) bool, sortUint64 func(d1, d2 uint64) bool) bool {
// Sort by float64/int64 first as that is the most likely match
{
d1, ok1 := p.points[i].Value.(float64)
d2, ok2 := p.points[j].Value.(float64)
if ok1 && ok2 {
return sortFloat(d1, d2)
}
}
{
d1, ok1 := p.points[i].Value.(int64)
d2, ok2 := p.points[j].Value.(int64)
if ok1 && ok2 {
return sortInt64(d1, d2)
}
}
// Sort by every numeric type left
{
d1, ok1 := p.points[i].Value.(float32)
d2, ok2 := p.points[j].Value.(float32)
if ok1 && ok2 {
return sortFloat(float64(d1), float64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(uint64)
d2, ok2 := p.points[j].Value.(uint64)
if ok1 && ok2 {
return sortUint64(d1, d2)
}
}
{
d1, ok1 := p.points[i].Value.(uint32)
d2, ok2 := p.points[j].Value.(uint32)
if ok1 && ok2 {
return sortUint64(uint64(d1), uint64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(uint16)
d2, ok2 := p.points[j].Value.(uint16)
if ok1 && ok2 {
return sortUint64(uint64(d1), uint64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(uint8)
d2, ok2 := p.points[j].Value.(uint8)
if ok1 && ok2 {
return sortUint64(uint64(d1), uint64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(int32)
d2, ok2 := p.points[j].Value.(int32)
if ok1 && ok2 {
return sortInt64(int64(d1), int64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(int16)
d2, ok2 := p.points[j].Value.(int16)
if ok1 && ok2 {
return sortInt64(int64(d1), int64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(int8)
d2, ok2 := p.points[j].Value.(int8)
if ok1 && ok2 {
return sortInt64(int64(d1), int64(d2))
}
}
{
d1, ok1 := p.points[i].Value.(bool)
d2, ok2 := p.points[j].Value.(bool)
if ok1 && ok2 {
return d1 == true && d2 == false
}
}
{
d1, ok1 := p.points[i].Value.(string)
d2, ok2 := p.points[j].Value.(string)
if ok1 && ok2 {
return d1 < d2
}
}
// Types did not match, need to sort based on arbitrary weighting of type
const (
intWeight = iota
floatWeight
boolWeight
stringWeight
)
infer := func(val interface{}) (int, float64) {
switch v := val.(type) {
case uint64:
return intWeight, float64(v)
case uint32:
return intWeight, float64(v)
case uint16:
return intWeight, float64(v)
case uint8:
return intWeight, float64(v)
case int64:
return intWeight, float64(v)
case int32:
return intWeight, float64(v)
case int16:
return intWeight, float64(v)
case int8:
return intWeight, float64(v)
case float64:
return floatWeight, float64(v)
case float32:
return floatWeight, float64(v)
case bool:
return boolWeight, 0
case string:
return stringWeight, 0
}
panic("interfaceValues.Less - unreachable code")
}
w1, n1 := infer(p.points[i].Value)
w2, n2 := infer(p.points[j].Value)
// If we had "numeric" data, use that for comparison
if (w1 == floatWeight || w1 == intWeight) && (w2 == floatWeight || w2 == intWeight) {
return sortFloat(n1, n2)
}
return w1 < w2
}
type PositionPoints []PositionPoint
type PositionPoint struct {
Time int64
Value interface{}
Tags map[string]string
}
type topMapOut struct {
positionOut
}
func (t topMapOut) Len() int { return len(t.points) }
func (t topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t topMapOut) Less(i, j int) bool {
sortFloat := func(d1, d2 float64) bool {
if d1 != d2 {
return d1 > d2
}
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
}
return t.lessKey(i, j)
}
sortInt64 := func(d1, d2 int64) bool {
if d1 != d2 {
return d1 > d2
}
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
}
return t.lessKey(i, j)
}
sortUint64 := func(d1, d2 uint64) bool {
if d1 != d2 {
return d1 > d2
}
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
}
return t.lessKey(i, j)
}
return t.less(i, j, sortFloat, sortInt64, sortUint64)
}
type topReduceOut struct {
positionOut
}
func (t topReduceOut) Len() int { return len(t.points) }
func (t topReduceOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t topReduceOut) Less(i, j int) bool {
// Now sort by time first, not value
sortFloat := func(d1, d2 float64) bool {
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
}
if d1 != d2 {
return d1 > d2
}
return t.lessKey(i, j)
}
sortInt64 := func(d1, d2 int64) bool {
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
}
if d1 != d2 {
return d1 > d2
}
return t.lessKey(i, j)
}
sortUint64 := func(d1, d2 uint64) bool {
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
}
if d1 != d2 {
return d1 > d2
}
return t.lessKey(i, j)
}
return t.less(i, j, sortFloat, sortInt64, sortUint64)
}
// callArgs will get any additional field/tag names that may be needed to sort with
// it is important to maintain the order of these that they were asked for in the call
// for sorting purposes
func topCallArgs(c *Call) []string {
var names []string
for _, v := range c.Args[1 : len(c.Args)-1] {
if f, ok := v.(*VarRef); ok {
names = append(names, f.Val)
}
}
return names
}
// MapTop emits the top data points for each group by interval
func MapTop(itr Iterator, c *Call) interface{} {
// Capture the limit if it was specified in the call
lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral)
limit := int64(lit.Val)
// Simple case where only value and limit are specified.
if len(c.Args) == 2 {
out := positionOut{callArgs: topCallArgs(c)}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
t := k
if bt := itr.TMin(); bt > -1 {
t = bt
}
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
}
// If we have more than we asked for, only send back the top values
if int64(len(out.points)) > limit {
sort.Sort(topMapOut{out})
out.points = out.points[:limit]
}
if len(out.points) > 0 {
return out.points
}
return nil
}
// They specified tags in the call to get unique sets, so we need to map them as we accumulate them
outMap := make(map[string]positionOut)
mapKey := func(args []string, fields map[string]interface{}, keys map[string]string) string {
key := ""
for _, a := range args {
if v, ok := fields[a]; ok {
key += a + ":" + fmt.Sprintf("%v", v) + ","
continue
}
if v, ok := keys[a]; ok {
key += a + ":" + v + ","
continue
}
}
return key
}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
t := k
if bt := itr.TMin(); bt > -1 {
t = bt
}
callArgs := c.Fields()
tags := itr.Tags()
// TODO in the future we need to send in fields as well
// this will allow a user to query on both fields and tags
// fields will take the priority over tags if there is a name collision
key := mapKey(callArgs, nil, tags)
if out, ok := outMap[key]; ok {
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
outMap[key] = out
} else {
out = positionOut{callArgs: topCallArgs(c)}
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
outMap[key] = out
}
}
// Sort all the maps
for k, v := range outMap {
sort.Sort(topMapOut{v})
outMap[k] = v
}
slice := func(needed int64, m map[string]positionOut) PositionPoints {
points := PositionPoints{}
var collected int64
for k, v := range m {
if len(v.points) > 0 {
points = append(points, v.points[0])
v.points = v.points[1:]
m[k] = v
collected++
}
}
o := positionOut{callArgs: topCallArgs(c), points: points}
sort.Sort(topMapOut{o})
points = o.points
// If we got more than we needed, sort them and return the top
if collected > needed {
points = o.points[:needed]
}
return points
}
points := PositionPoints{}
var collected int64
for collected < limit {
p := slice(limit-collected, outMap)
if len(p) == 0 {
break
}
points = append(points, p...)
collected += int64(len(p))
}
if len(points) > 0 {
return points
}
return nil
}
// ReduceTop computes the top values for each key.
func ReduceTop(values []interface{}, c *Call) interface{} {
lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral)
limit := int64(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
for _, v := range values {
if v == nil {
continue
}
o, _ := v.(PositionPoints)
out.points = append(out.points, o...)
}
// Get the top of the top values
sort.Sort(topMapOut{out})
// If we have more than we asked for, only send back the top values
if int64(len(out.points)) > limit {
out.points = out.points[:limit]
}
// now we need to resort the tops by time
sort.Sort(topReduceOut{out})
if len(out.points) > 0 {
return out.points
}
return nil
}
// MapEcho emits the data points for each group by interval
func MapEcho(itr Iterator) interface{} {
var values []interface{}
@ -1046,36 +1501,39 @@ func MapEcho(itr Iterator) interface{} {
}
// ReducePercentile computes the percentile of values for each key.
func ReducePercentile(percentile float64) ReduceFunc {
return func(values []interface{}) interface{} {
var allValues []float64
func ReducePercentile(values []interface{}, c *Call) interface{} {
// Checks that this arg exists and is a valid type are done in the parsing validation
// and have test coverage there
lit, _ := c.Args[1].(*NumberLiteral)
percentile := lit.Val
for _, v := range values {
if v == nil {
continue
}
var allValues []float64
vals := v.([]interface{})
for _, v := range vals {
switch v.(type) {
case int64:
allValues = append(allValues, float64(v.(int64)))
case float64:
allValues = append(allValues, v.(float64))
}
}
for _, v := range values {
if v == nil {
continue
}
sort.Float64s(allValues)
length := len(allValues)
index := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
if index < 0 || index >= len(allValues) {
return nil
vals := v.([]interface{})
for _, v := range vals {
switch v.(type) {
case int64:
allValues = append(allValues, float64(v.(int64)))
case float64:
allValues = append(allValues, v.(float64))
}
}
return allValues[index]
}
sort.Float64s(allValues)
length := len(allValues)
index := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
if index < 0 || index >= len(allValues) {
return nil
}
return allValues[index]
}
// IsNumeric returns whether a given aggregate can only be run on numeric fields.

View File

@ -14,15 +14,24 @@ type point struct {
seriesKey string
time int64
value interface{}
tags map[string]string
}
type testIterator struct {
values []point
values []point
lastTags map[string]string
nextFunc func() (timestamp int64, value interface{})
tagsFunc func() map[string]string
tMinFunc func() int64
}
func (t *testIterator) Next() (timestamp int64, value interface{}) {
if t.nextFunc != nil {
return t.nextFunc()
}
if len(t.values) > 0 {
v := t.values[0]
t.lastTags = t.values[0].tags
t.values = t.values[1:]
return v.time, v.value
}
@ -30,6 +39,20 @@ func (t *testIterator) Next() (timestamp int64, value interface{}) {
return -1, nil
}
func (t *testIterator) Tags() map[string]string {
if t.tagsFunc != nil {
return t.tagsFunc()
}
return t.lastTags
}
func (t *testIterator) TMin() int64 {
if t.tMinFunc != nil {
return t.tMinFunc()
}
return -1
}
func TestMapMeanNoValues(t *testing.T) {
iter := &testIterator{}
if got := MapMean(iter); got != nil {
@ -44,13 +67,13 @@ func TestMapMean(t *testing.T) {
output *meanMapOutput
}{
{ // Single point
input: []point{point{"0", 1, 1.0}},
input: []point{point{"0", 1, 1.0, nil}},
output: &meanMapOutput{1, 1, Float64Type},
},
{ // Two points
input: []point{
point{"0", 1, 2.0},
point{"0", 2, 8.0},
point{"0", 1, 2.0, nil},
point{"0", 2, 8.0, nil},
},
output: &meanMapOutput{2, 5.0, Float64Type},
},
@ -71,55 +94,12 @@ func TestMapMean(t *testing.T) {
}
}
}
func TestInitializeMapFuncPercentile(t *testing.T) {
// No args
c := &Call{
Name: "percentile",
Args: []Expr{},
}
_, err := InitializeMapFunc(c)
if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
}
if exp := "expected two arguments for percentile()"; err.Error() != exp {
t.Errorf("InitializeMapFunc(%v) mismatch. exp %v got %v", c, exp, err.Error())
}
// No percentile arg
c = &Call{
Name: "percentile",
Args: []Expr{
&VarRef{Val: "field1"},
},
}
_, err = InitializeMapFunc(c)
if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
}
if exp := "expected two arguments for percentile()"; err.Error() != exp {
t.Errorf("InitializeMapFunc(%v) mismatch. exp %v got %v", c, exp, err.Error())
}
}
func TestInitializeMapFuncDerivative(t *testing.T) {
for _, fn := range []string{"derivative", "non_negative_derivative"} {
// No args should fail
c := &Call{
Name: fn,
Args: []Expr{},
}
_, err := InitializeMapFunc(c)
if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
}
// Single field arg should return MapEcho
c = &Call{
c := &Call{
Name: fn,
Args: []Expr{
&VarRef{Val: " field1"},
@ -127,7 +107,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
},
}
_, err = InitializeMapFunc(c)
_, err := InitializeMapFunc(c)
if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
}
@ -148,48 +128,14 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
}
}
func TestInitializeReduceFuncPercentile(t *testing.T) {
// No args
c := &Call{
Name: "percentile",
Args: []Expr{},
}
_, err := InitializeReduceFunc(c)
if err == nil {
t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c)
}
if exp := "expected float argument in percentile()"; err.Error() != exp {
t.Errorf("InitializedReduceFunc(%v) mismatch. exp %v got %v", c, exp, err.Error())
}
// No percentile arg
c = &Call{
Name: "percentile",
Args: []Expr{
&VarRef{Val: "field1"},
},
}
_, err = InitializeReduceFunc(c)
if err == nil {
t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c)
}
if exp := "expected float argument in percentile()"; err.Error() != exp {
t.Errorf("InitializedReduceFunc(%v) mismatch. exp %v got %v", c, exp, err.Error())
}
}
func TestReducePercentileNil(t *testing.T) {
// ReducePercentile should ignore nil values when calculating the percentile
fn := ReducePercentile(100)
input := []interface{}{
nil,
}
got := fn(input)
// ReducePercentile should ignore nil values when calculating the percentile
got := ReducePercentile(input, &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 100}}})
if got != nil {
t.Fatalf("ReducePercentile(100) returned wrong type. exp nil got %v", got)
}
@ -212,16 +158,16 @@ func TestMapDistinct(t *testing.T) {
iter := &testIterator{
values: []point{
{seriesKey1, timeId1, uint64(1)},
{seriesKey1, timeId2, uint64(1)},
{seriesKey1, timeId3, "1"},
{seriesKey2, timeId4, uint64(1)},
{seriesKey2, timeId5, float64(1.0)},
{seriesKey2, timeId6, "1"},
{seriesKey1, timeId1, uint64(1), nil},
{seriesKey1, timeId2, uint64(1), nil},
{seriesKey1, timeId3, "1", nil},
{seriesKey2, timeId4, uint64(1), nil},
{seriesKey2, timeId5, float64(1.0), nil},
{seriesKey2, timeId6, "1", nil},
},
}
values := MapDistinct(iter).(distinctValues)
values := MapDistinct(iter).(interfaceValues)
if exp, got := 3, len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)
@ -229,7 +175,7 @@ func TestMapDistinct(t *testing.T) {
sort.Sort(values)
exp := distinctValues{
exp := interfaceValues{
uint64(1),
float64(1),
"1",
@ -253,7 +199,7 @@ func TestMapDistinctNil(t *testing.T) {
}
func TestReduceDistinct(t *testing.T) {
v1 := distinctValues{
v1 := interfaceValues{
"2",
"1",
float64(2.0),
@ -264,7 +210,7 @@ func TestReduceDistinct(t *testing.T) {
false,
}
expect := distinctValues{
expect := interfaceValues{
uint64(1),
float64(1),
uint64(2),
@ -301,11 +247,11 @@ func TestReduceDistinctNil(t *testing.T) {
},
{
name: "empty mappper (len 1)",
values: []interface{}{distinctValues{}},
values: []interface{}{interfaceValues{}},
},
{
name: "empty mappper (len 2)",
values: []interface{}{distinctValues{}, distinctValues{}},
values: []interface{}{interfaceValues{}, interfaceValues{}},
},
}
@ -319,7 +265,7 @@ func TestReduceDistinctNil(t *testing.T) {
}
func Test_distinctValues_Sort(t *testing.T) {
values := distinctValues{
values := interfaceValues{
"2",
"1",
float64(2.0),
@ -330,7 +276,7 @@ func Test_distinctValues_Sort(t *testing.T) {
false,
}
expect := distinctValues{
expect := interfaceValues{
uint64(1),
float64(1),
uint64(2),
@ -366,13 +312,13 @@ func TestMapCountDistinct(t *testing.T) {
iter := &testIterator{
values: []point{
{seriesKey1, timeId1, uint64(1)},
{seriesKey1, timeId2, uint64(1)},
{seriesKey1, timeId3, "1"},
{seriesKey2, timeId4, uint64(1)},
{seriesKey2, timeId5, float64(1.0)},
{seriesKey2, timeId6, "1"},
{seriesKey2, timeId7, true},
{seriesKey1, timeId1, uint64(1), nil},
{seriesKey1, timeId2, uint64(1), nil},
{seriesKey1, timeId3, "1", nil},
{seriesKey2, timeId4, uint64(1), nil},
{seriesKey2, timeId5, float64(1.0), nil},
{seriesKey2, timeId6, "1", nil},
{seriesKey2, timeId7, true, nil},
},
}
@ -532,3 +478,276 @@ func BenchmarkGetSortedRangeBySort(b *testing.B) {
}
benchGetSortedRangeResults = results
}
func TestMapTop(t *testing.T) {
tests := []struct {
name string
skip bool
iter *testIterator
exp positionOut
call *Call
}{
{
name: "int64 - basic",
iter: &testIterator{
values: []point{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, int64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "int64 - basic with tag",
iter: &testIterator{
values: []point{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 20, int64(53), map[string]string{"host": "b"}},
{"", 30, int64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(53), map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, resolve based on time",
iter: &testIterator{
values: []point{
{"", 20, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "a"}},
{"", 10, int64(99), map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, time, resolve based on tags",
iter: &testIterator{
values: []point{
{"", 10, int64(99), map[string]string{"host": "b"}},
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 20, int64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{10, int64(99), map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints",
iter: &testIterator{
values: []point{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, uint64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints & floats",
iter: &testIterator{
values: []point{
{"", 10, float64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, uint64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, float64(99), map[string]string{"host": "a"}},
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints, floats, & strings",
iter: &testIterator{
values: []point{
{"", 10, float64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, "88", map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, float64(99), map[string]string{"host": "a"}},
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "bools",
iter: &testIterator{
values: []point{
{"", 10, true, map[string]string{"host": "a"}},
{"", 10, true, map[string]string{"host": "b"}},
{"", 20, false, map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, true, map[string]string{"host": "a"}},
PositionPoint{10, true, map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
}
for _, test := range tests {
if test.skip {
continue
}
values := MapTop(test.iter, test.call).(PositionPoints)
t.Logf("Test: %s", test.name)
if exp, got := len(test.exp.points), len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)
}
if !reflect.DeepEqual(values, test.exp.points) {
t.Errorf("Wrong values. \nexp\n %v\ngot\n %v", spew.Sdump(test.exp.points), spew.Sdump(values))
}
}
}
func TestReduceTop(t *testing.T) {
tests := []struct {
name string
skip bool
values []interface{}
exp PositionPoints
call *Call
}{
{
name: "int64 - single map",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
},
},
exp: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
},
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
},
},
exp: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map with nil",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
},
nil,
},
exp: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map with non-matching tags and tag selected",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{}},
},
nil,
},
exp: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
},
{
skip: true,
name: "int64 - double map with non-matching tags",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{}},
},
nil,
},
exp: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(55), map[string]string{"host": "b"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
},
}
for _, test := range tests {
if test.skip {
continue
}
values := ReduceTop(test.values, test.call)
t.Logf("Test: %s", test.name)
if values != nil {
v, _ := values.(PositionPoints)
if exp, got := len(test.exp), len(v); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)
}
}
if !reflect.DeepEqual(values, test.exp) {
t.Errorf("Wrong values. \nexp\n %v\ngot\n %v", spew.Sdump(test.exp), spew.Sdump(values))
}
}
}

View File

@ -213,6 +213,65 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// select percentile statements
{
s: `select percentile("field1", 2.0) from cpu`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "percentile", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2.0}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
},
},
// select top statements
{
s: `select top("field1", 2) from cpu`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
},
},
{
s: `select top(field1, 2) from cpu`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
},
},
{
s: `select top(field1, 2), tag1 from cpu`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}},
{Expr: &influxql.VarRef{Val: "tag1"}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
},
},
{
s: `select top(field1, tag1, 2), tag1 from cpu`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "tag1"}, &influxql.NumberLiteral{Val: 2}}}},
{Expr: &influxql.VarRef{Val: "tag1"}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
},
},
// select distinct statements
{
s: `select distinct(field1) from cpu`,
@ -1252,6 +1311,21 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 FROM myseries GROUP`, err: `found EOF, expected BY at line 1, char 35`},
{s: `SELECT field1 FROM myseries LIMIT`, err: `found EOF, expected number at line 1, char 35`},
{s: `SELECT field1 FROM myseries LIMIT 10.5`, err: `fractional parts not allowed in LIMIT at line 1, char 35`},
{s: `SELECT top() FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 0`},
{s: `SELECT top(field1) FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 1`},
{s: `SELECT top(field1,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`},
{s: `SELECT top(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`},
{s: `SELECT bottom() FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 0`},
{s: `SELECT bottom(field1) FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
{s: `SELECT bottom(field1,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`},
{s: `SELECT bottom(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`},
{s: `SELECT percentile() FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 0`},
{s: `SELECT percentile(field1) FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT percentile(field1, foo) FROM myseries`, err: `expected float argument in percentile()`},
{s: `SELECT field1 FROM myseries OFFSET`, err: `found EOF, expected number at line 1, char 36`},
{s: `SELECT field1 FROM myseries OFFSET 10.5`, err: `fractional parts not allowed in OFFSET at line 1, char 36`},
{s: `SELECT field1 FROM myseries ORDER`, err: `found EOF, expected BY at line 1, char 35`},
@ -1268,7 +1342,6 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 FROM 12`, err: `found 12, expected identifier at line 1, char 20`},
{s: `SELECT 1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 FROM myseries`, err: `unable to parse number at line 1, char 8`},
{s: `SELECT 10.5h FROM myseries`, err: `found h, expected FROM at line 1, char 12`},
{s: `SELECT derivative(field1), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `SELECT distinct(field1), sum(field1) FROM myseries`, err: `aggregate function distinct() can not be combined with other functions or fields`},
{s: `SELECT distinct(field1), field2 FROM myseries`, err: `aggregate function distinct() can not be combined with other functions or fields`},
{s: `SELECT distinct(field1, field2) FROM myseries`, err: `distinct function can only have one argument`},
@ -1279,8 +1352,12 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT count(distinct field1, field2) FROM myseries`, err: `count(distinct <field>) can only have one argument`},
{s: `select count(distinct(too, many, arguments)) from myseries`, err: `count(distinct <field>) can only have one argument`},
{s: `select count() from myseries`, err: `invalid number of arguments for count, expected 1, got 0`},
{s: `SELECT derivative(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `select derivative() from myseries`, err: `invalid number of arguments for derivative, expected at least 1 but no more than 2, got 0`},
{s: `select derivative(mean(value), 1h, 3) from myseries`, err: `invalid number of arguments for derivative, expected at least 1 but no more than 2, got 3`},
{s: `SELECT non_negative_derivative(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `select non_negative_derivative() from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 0`},
{s: `select non_negative_derivative(mean(value), 1h, 3) from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 3`},
{s: `SELECT field1 from myseries WHERE host =~ 'asd' LIMIT 1`, err: `found asd, expected regex at line 1, char 42`},
{s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
{s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},

View File

@ -625,13 +625,13 @@ func TestStatementExecutor_ExecuteStatement_CreateContinuousQuery(t *testing.T)
t.Fatalf("unexpected database: %s", database)
} else if name != "cq0" {
t.Fatalf("unexpected name: %s", name)
} else if query != `CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END` {
} else if query != `CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(field1) INTO db1 FROM db0 GROUP BY time(1h) END` {
t.Fatalf("unexpected query: %s", query)
}
return nil
}
stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END`)
stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(field1) INTO db1 FROM db0 GROUP BY time(1h) END`)
if res := e.ExecuteStatement(stmt); res.Err != nil {
t.Fatal(res.Err)
} else if res.Series != nil {
@ -646,7 +646,7 @@ func TestStatementExecutor_ExecuteStatement_CreateContinuousQuery_Err(t *testing
return errors.New("marker")
}
stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END`)
stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(field1) INTO db1 FROM db0 GROUP BY time(1h) END`)
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
t.Fatalf("unexpected error: %s", res.Err)
}
@ -693,14 +693,14 @@ func TestStatementExecutor_ExecuteStatement_ShowContinuousQueries(t *testing.T)
{
Name: "db0",
ContinuousQueries: []meta.ContinuousQueryInfo{
{Name: "cq0", Query: "SELECT count(*) INTO db1 FROM db0"},
{Name: "cq1", Query: "SELECT count(*) INTO db2 FROM db0"},
{Name: "cq0", Query: "SELECT count(field1) INTO db1 FROM db0"},
{Name: "cq1", Query: "SELECT count(field1) INTO db2 FROM db0"},
},
},
{
Name: "db1",
ContinuousQueries: []meta.ContinuousQueryInfo{
{Name: "cq2", Query: "SELECT count(*) INTO db3 FROM db1"},
{Name: "cq2", Query: "SELECT count(field1) INTO db3 FROM db1"},
},
},
}, nil
@ -714,15 +714,15 @@ func TestStatementExecutor_ExecuteStatement_ShowContinuousQueries(t *testing.T)
Name: "db0",
Columns: []string{"name", "query"},
Values: [][]interface{}{
{"cq0", "SELECT count(*) INTO db1 FROM db0"},
{"cq1", "SELECT count(*) INTO db2 FROM db0"},
{"cq0", "SELECT count(field1) INTO db1 FROM db0"},
{"cq1", "SELECT count(field1) INTO db2 FROM db0"},
},
},
{
Name: "db1",
Columns: []string{"name", "query"},
Values: [][]interface{}{
{"cq2", "SELECT count(*) INTO db3 FROM db1"},
{"cq2", "SELECT count(field1) INTO db3 FROM db1"},
},
},
}) {
@ -755,7 +755,7 @@ func TestStatementExecutor_ExecuteStatement_Unsupported(t *testing.T) {
// Execute a SELECT statement.
NewStatementExecutor().ExecuteStatement(
influxql.MustParseStatement(`SELECT count(*) FROM db0`),
influxql.MustParseStatement(`SELECT count(field1) FROM db0`),
)
}()

37
pkg/slices/strings.go Normal file
View File

@ -0,0 +1,37 @@
package slices
import "strings"
func Union(setA, setB []string, ignoreCase bool) []string {
for _, b := range setB {
if ignoreCase {
if !ExistsIgnoreCase(setA, b) {
setA = append(setA, b)
}
continue
}
if !Exists(setA, b) {
setA = append(setA, b)
}
}
return setA
}
func Exists(set []string, find string) bool {
for _, s := range set {
if s == find {
return true
}
}
return false
}
func ExistsIgnoreCase(set []string, find string) bool {
find = strings.ToLower(find)
for _, s := range set {
if strings.ToLower(s) == find {
return true
}
}
return false
}

View File

@ -397,11 +397,7 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
}
// Put together the rows to return, starting with columns.
columnNames := make([]string, len(e.stmt.Fields)+1)
columnNames[0] = "time"
for i, f := range e.stmt.Fields {
columnNames[i+1] = f.Name()
}
columnNames := e.stmt.ColumnNames()
// Open the mappers.
for _, m := range e.mappers {
@ -528,6 +524,12 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
}
}
// Perform top/bottom unwraps
values, err = e.processTopBottom(values, columnNames)
if err != nil {
out <- &influxql.Row{Err: err}
}
// Perform any mathematics.
values = processForMath(e.stmt.Fields, values)
@ -622,6 +624,67 @@ func (e *SelectExecutor) close() {
}
}
func (e *SelectExecutor) processTopBottom(results [][]interface{}, columnNames []string) ([][]interface{}, error) {
aggregates := e.stmt.FunctionCalls()
var call *influxql.Call
process := false
for _, c := range aggregates {
if c.Name == "top" || c.Name == "bottom" {
process = true
call = c
break
}
}
if !process {
return results, nil
}
var values [][]interface{}
// Check if we have a group by, if not, rewrite the entire result by flattening it out
//if len(e.stmt.Dimensions) == 0 {
for _, vals := range results {
// start at 1 because the first value is always time
for j := 1; j < len(vals); j++ {
switch v := vals[j].(type) {
case influxql.PositionPoints:
tMin := vals[0].(time.Time)
for _, p := range v {
result := e.topBottomPointToQueryResult(p, tMin, call, columnNames)
values = append(values, result)
}
case nil:
continue
default:
return nil, fmt.Errorf("unrechable code - processTopBottom")
}
}
}
return values, nil
}
func (e *SelectExecutor) topBottomPointToQueryResult(p influxql.PositionPoint, tMin time.Time, call *influxql.Call, columnNames []string) []interface{} {
tm := time.Unix(0, p.Time).UTC().Format(time.RFC3339Nano)
// If we didn't explicity ask for time, and we have a group by, then use TMIN for the time returned
if len(e.stmt.Dimensions) > 0 && !e.stmt.HasTimeFieldSpecified() {
tm = tMin.UTC().Format(time.RFC3339Nano)
}
vals := []interface{}{tm}
for _, c := range columnNames {
if c == call.Name {
vals = append(vals, p.Value)
continue
}
// TODO in the future fields will also be available to us.
// we should always favor fields over tags if there is a name collision
// look in the tags for a value
if t, ok := p.Tags[c]; ok {
vals = append(vals, t)
}
}
return vals
}
// limitedRowWriter accepts raw mapper values, and will emit those values as rows in chunks
// of the given size. If the chunk size is 0, no chunking will be performed. In addiiton if
// limit is reached, outstanding values will be emitted. If limit is zero, no limit is enforced.

View File

@ -343,7 +343,7 @@ func (lm *SelectMapper) nextChunkRaw() (interface{}, error) {
}
cursor := lm.cursors[lm.currCursorIndex]
k, v, t := cursor.Next(lm.queryTMin, lm.queryTMax, lm.selectFields, lm.whereFields)
k, v := cursor.Next(lm.queryTMin, lm.queryTMax, lm.selectFields, lm.whereFields)
if v == nil {
// Tagset cursor is empty, move to next one.
lm.currCursorIndex++
@ -363,7 +363,7 @@ func (lm *SelectMapper) nextChunkRaw() (interface{}, error) {
cursorKey: cursor.key(),
}
}
value := &MapperValue{Time: k, Value: v, Tags: t}
value := &MapperValue{Time: k, Value: v, Tags: cursor.Tags()}
output.Values = append(output.Values, value)
if len(output.Values) == lm.chunkSize {
return output, nil
@ -435,13 +435,29 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
heap.Push(tsc.pointHeap, p)
}
// Wrap the tagset cursor so it implements the mapping functions interface.
f := func() (time int64, value interface{}) {
k, v, _ := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields)
nextf := func() (time int64, value interface{}) {
k, v := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields)
return k, v
}
tagf := func() map[string]string {
return tsc.Tags()
}
tminf := func() int64 {
if len(lm.selectStmt.Dimensions) == 0 {
return -1
}
if !lm.selectStmt.HasTimeFieldSpecified() {
return tmin
}
return -1
}
tagSetCursor := &aggTagSetCursor{
nextFunc: f,
nextFunc: nextf,
tagsFunc: tagf,
tMinFunc: tminf,
}
// Execute the map function which walks the entire interval, and aggregates
@ -619,6 +635,8 @@ func (lm *SelectMapper) Close() {
// by intervals.
type aggTagSetCursor struct {
nextFunc func() (time int64, value interface{})
tagsFunc func() map[string]string
tMinFunc func() int64
}
// Next returns the next value for the aggTagSetCursor. It implements the interface expected
@ -627,6 +645,16 @@ func (a *aggTagSetCursor) Next() (time int64, value interface{}) {
return a.nextFunc()
}
// Tags returns the current tags for the cursor
func (a *aggTagSetCursor) Tags() map[string]string {
return a.tagsFunc()
}
// TMin returns the current floor time for the bucket being worked on
func (a *aggTagSetCursor) TMin() int64 {
return a.tMinFunc()
}
type pointHeapItem struct {
timestamp int64
value []byte
@ -670,6 +698,7 @@ type tagSetCursor struct {
tags map[string]string // Tag key-value pairs
cursors []*seriesCursor // Underlying series cursors.
decoder *FieldCodec // decoder for the raw data bytes
currentTags map[string]string // the current tags for the underlying series cursor in play
// pointHeap is a min-heap, ordered by timestamp, that contains the next
// point from each seriesCursor. Queries sometimes pull points from
@ -723,11 +752,11 @@ func (tsc *tagSetCursor) key() string {
// Next returns the next matching series-key, timestamp byte slice and meta tags for the tagset. Filtering
// is enforced on the values. If there is no matching value, then a nil result is returned.
func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}, map[string]string) {
func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}) {
for {
// If we're out of points, we're done.
if tsc.pointHeap.Len() == 0 {
return -1, nil, nil
return -1, nil
}
// Grab the next point with the lowest timestamp.
@ -735,13 +764,16 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri
// We're done if the point is outside the query's time range [tmin:tmax).
if p.timestamp != tmin && (tmin > p.timestamp || p.timestamp >= tmax) {
return -1, nil, nil
return -1, nil
}
// Decode the raw point.
value := tsc.decodeRawPoint(p, selectFields, whereFields)
timestamp := p.timestamp
tags := p.cursor.tags
// Keep track of the current tags for the series cursor so we can
// respond with them if asked
tsc.currentTags = p.cursor.tags
// Advance the cursor
nextKey, nextVal := p.cursor.Next()
@ -759,10 +791,16 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri
continue
}
return timestamp, value, tags
return timestamp, value
}
}
// Tags returns the current tags of the current cursor
// if there is no current currsor, it returns nil
func (tsc *tagSetCursor) Tags() map[string]string {
return tsc.currentTags
}
// decodeRawPoint decodes raw point data into field names & values and does WHERE filtering.
func (tsc *tagSetCursor) decodeRawPoint(p *pointHeapItem, selectFields, whereFields []string) interface{} {
if len(selectFields) > 1 {