parent
5c1ba44c9b
commit
66fc270d1e
|
@ -386,9 +386,9 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
|
||||||
// the offsets within the value slices that are returned by the
|
// the offsets within the value slices that are returned by the
|
||||||
// mapper.
|
// mapper.
|
||||||
aggregates := e.stmt.FunctionCalls()
|
aggregates := e.stmt.FunctionCalls()
|
||||||
reduceFuncs := make([]influxql.ReduceFunc, len(aggregates))
|
reduceFuncs := make([]ReduceFunc, len(aggregates))
|
||||||
for i, c := range aggregates {
|
for i, c := range aggregates {
|
||||||
reduceFunc, err := influxql.InitializeReduceFunc(c)
|
reduceFunc, err := InitializeReduceFunc(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
out <- &influxql.Row{Err: err}
|
out <- &influxql.Row{Err: err}
|
||||||
return
|
return
|
||||||
|
@ -646,7 +646,7 @@ func (e *SelectExecutor) processTopBottom(results [][]interface{}, columnNames [
|
||||||
// start at 1 because the first value is always time
|
// start at 1 because the first value is always time
|
||||||
for j := 1; j < len(vals); j++ {
|
for j := 1; j < len(vals); j++ {
|
||||||
switch v := vals[j].(type) {
|
switch v := vals[j].(type) {
|
||||||
case influxql.PositionPoints:
|
case PositionPoints:
|
||||||
tMin := vals[0].(time.Time)
|
tMin := vals[0].(time.Time)
|
||||||
for _, p := range v {
|
for _, p := range v {
|
||||||
result := e.topBottomPointToQueryResult(p, tMin, call, columnNames)
|
result := e.topBottomPointToQueryResult(p, tMin, call, columnNames)
|
||||||
|
@ -662,7 +662,7 @@ func (e *SelectExecutor) processTopBottom(results [][]interface{}, columnNames [
|
||||||
return values, nil
|
return values, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *SelectExecutor) topBottomPointToQueryResult(p influxql.PositionPoint, tMin time.Time, call *influxql.Call, columnNames []string) []interface{} {
|
func (e *SelectExecutor) topBottomPointToQueryResult(p PositionPoint, tMin time.Time, call *influxql.Call, columnNames []string) []interface{} {
|
||||||
tm := time.Unix(0, p.Time).UTC().Format(time.RFC3339Nano)
|
tm := time.Unix(0, p.Time).UTC().Format(time.RFC3339Nano)
|
||||||
// If we didn't explicity ask for time, and we have a group by, then use TMIN for the time returned
|
// If we didn't explicity ask for time, and we have a group by, then use TMIN for the time returned
|
||||||
if len(e.stmt.Dimensions) > 0 && !e.stmt.HasTimeFieldSpecified() {
|
if len(e.stmt.Dimensions) > 0 && !e.stmt.HasTimeFieldSpecified() {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package influxql
|
package tsdb
|
||||||
|
|
||||||
// All aggregate and query functions are defined in this file along with any intermediate data objects they need to process.
|
// All aggregate and query functions are defined in this file along with any intermediate data objects they need to process.
|
||||||
// Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce
|
// Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce
|
||||||
|
@ -12,6 +12,8 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/influxql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Iterator represents a forward-only iterator over a set of points.
|
// Iterator represents a forward-only iterator over a set of points.
|
||||||
|
@ -34,7 +36,7 @@ type ReduceFunc func([]interface{}) interface{}
|
||||||
type UnmarshalFunc func([]byte) (interface{}, error)
|
type UnmarshalFunc func([]byte) (interface{}, error)
|
||||||
|
|
||||||
// InitializeMapFunc takes an aggregate call from the query and returns the MapFunc
|
// InitializeMapFunc takes an aggregate call from the query and returns the MapFunc
|
||||||
func InitializeMapFunc(c *Call) (MapFunc, error) {
|
func InitializeMapFunc(c *influxql.Call) (MapFunc, error) {
|
||||||
// see if it's a query for raw data
|
// see if it's a query for raw data
|
||||||
if c == nil {
|
if c == nil {
|
||||||
return MapRawQuery, nil
|
return MapRawQuery, nil
|
||||||
|
@ -43,10 +45,10 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
|
||||||
// Retrieve map function by name.
|
// Retrieve map function by name.
|
||||||
switch c.Name {
|
switch c.Name {
|
||||||
case "count":
|
case "count":
|
||||||
if _, ok := c.Args[0].(*Distinct); ok {
|
if _, ok := c.Args[0].(*influxql.Distinct); ok {
|
||||||
return MapCountDistinct, nil
|
return MapCountDistinct, nil
|
||||||
}
|
}
|
||||||
if c, ok := c.Args[0].(*Call); ok {
|
if c, ok := c.Args[0].(*influxql.Call); ok {
|
||||||
if c.Name == "distinct" {
|
if c.Name == "distinct" {
|
||||||
return MapCountDistinct, nil
|
return MapCountDistinct, nil
|
||||||
}
|
}
|
||||||
|
@ -81,7 +83,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
|
||||||
case "derivative", "non_negative_derivative":
|
case "derivative", "non_negative_derivative":
|
||||||
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
||||||
// use the map func for that nested aggregate
|
// use the map func for that nested aggregate
|
||||||
if fn, ok := c.Args[0].(*Call); ok {
|
if fn, ok := c.Args[0].(*influxql.Call); ok {
|
||||||
return InitializeMapFunc(fn)
|
return InitializeMapFunc(fn)
|
||||||
}
|
}
|
||||||
return MapRawQuery, nil
|
return MapRawQuery, nil
|
||||||
|
@ -91,14 +93,14 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc
|
// InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc
|
||||||
func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
|
func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) {
|
||||||
// Retrieve reduce function by name.
|
// Retrieve reduce function by name.
|
||||||
switch c.Name {
|
switch c.Name {
|
||||||
case "count":
|
case "count":
|
||||||
if _, ok := c.Args[0].(*Distinct); ok {
|
if _, ok := c.Args[0].(*influxql.Distinct); ok {
|
||||||
return ReduceCountDistinct, nil
|
return ReduceCountDistinct, nil
|
||||||
}
|
}
|
||||||
if c, ok := c.Args[0].(*Call); ok {
|
if c, ok := c.Args[0].(*influxql.Call); ok {
|
||||||
if c.Name == "distinct" {
|
if c.Name == "distinct" {
|
||||||
return ReduceCountDistinct, nil
|
return ReduceCountDistinct, nil
|
||||||
}
|
}
|
||||||
|
@ -135,7 +137,7 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
|
||||||
case "derivative", "non_negative_derivative":
|
case "derivative", "non_negative_derivative":
|
||||||
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
||||||
// use the map func for that nested aggregate
|
// use the map func for that nested aggregate
|
||||||
if fn, ok := c.Args[0].(*Call); ok {
|
if fn, ok := c.Args[0].(*influxql.Call); ok {
|
||||||
return InitializeReduceFunc(fn)
|
return InitializeReduceFunc(fn)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("expected function argument to %s", c.Name)
|
return nil, fmt.Errorf("expected function argument to %s", c.Name)
|
||||||
|
@ -144,7 +146,7 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) {
|
func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) {
|
||||||
// if c is nil it's a raw data query
|
// if c is nil it's a raw data query
|
||||||
if c == nil {
|
if c == nil {
|
||||||
return func(b []byte) (interface{}, error) {
|
return func(b []byte) (interface{}, error) {
|
||||||
|
@ -1341,10 +1343,10 @@ func (t topReduceOut) Less(i, j int) bool {
|
||||||
// callArgs will get any additional field/tag names that may be needed to sort with
|
// callArgs will get any additional field/tag names that may be needed to sort with
|
||||||
// it is important to maintain the order of these that they were asked for in the call
|
// it is important to maintain the order of these that they were asked for in the call
|
||||||
// for sorting purposes
|
// for sorting purposes
|
||||||
func topCallArgs(c *Call) []string {
|
func topCallArgs(c *influxql.Call) []string {
|
||||||
var names []string
|
var names []string
|
||||||
for _, v := range c.Args[1 : len(c.Args)-1] {
|
for _, v := range c.Args[1 : len(c.Args)-1] {
|
||||||
if f, ok := v.(*VarRef); ok {
|
if f, ok := v.(*influxql.VarRef); ok {
|
||||||
names = append(names, f.Val)
|
names = append(names, f.Val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1352,9 +1354,9 @@ func topCallArgs(c *Call) []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapTop emits the top data points for each group by interval
|
// MapTop emits the top data points for each group by interval
|
||||||
func MapTop(itr Iterator, c *Call) interface{} {
|
func MapTop(itr Iterator, c *influxql.Call) interface{} {
|
||||||
// Capture the limit if it was specified in the call
|
// Capture the limit if it was specified in the call
|
||||||
lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral)
|
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
|
||||||
limit := int64(lit.Val)
|
limit := int64(lit.Val)
|
||||||
|
|
||||||
// Simple case where only value and limit are specified.
|
// Simple case where only value and limit are specified.
|
||||||
|
@ -1462,8 +1464,8 @@ func MapTop(itr Iterator, c *Call) interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReduceTop computes the top values for each key.
|
// ReduceTop computes the top values for each key.
|
||||||
func ReduceTop(values []interface{}, c *Call) interface{} {
|
func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
|
||||||
lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral)
|
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
|
||||||
limit := int64(lit.Val)
|
limit := int64(lit.Val)
|
||||||
|
|
||||||
out := positionOut{callArgs: topCallArgs(c)}
|
out := positionOut{callArgs: topCallArgs(c)}
|
||||||
|
@ -1501,10 +1503,10 @@ func MapEcho(itr Iterator) interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReducePercentile computes the percentile of values for each key.
|
// ReducePercentile computes the percentile of values for each key.
|
||||||
func ReducePercentile(values []interface{}, c *Call) interface{} {
|
func ReducePercentile(values []interface{}, c *influxql.Call) interface{} {
|
||||||
// Checks that this arg exists and is a valid type are done in the parsing validation
|
// Checks that this arg exists and is a valid type are done in the parsing validation
|
||||||
// and have test coverage there
|
// and have test coverage there
|
||||||
lit, _ := c.Args[1].(*NumberLiteral)
|
lit, _ := c.Args[1].(*influxql.NumberLiteral)
|
||||||
percentile := lit.Val
|
percentile := lit.Val
|
||||||
|
|
||||||
var allValues []float64
|
var allValues []float64
|
||||||
|
@ -1537,7 +1539,7 @@ func ReducePercentile(values []interface{}, c *Call) interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsNumeric returns whether a given aggregate can only be run on numeric fields.
|
// IsNumeric returns whether a given aggregate can only be run on numeric fields.
|
||||||
func IsNumeric(c *Call) bool {
|
func IsNumeric(c *influxql.Call) bool {
|
||||||
switch c.Name {
|
switch c.Name {
|
||||||
case "count", "first", "last", "distinct":
|
case "count", "first", "last", "distinct":
|
||||||
return false
|
return false
|
|
@ -1,4 +1,4 @@
|
||||||
package influxql
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -6,11 +6,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
"github.com/influxdb/influxdb/influxql"
|
||||||
)
|
)
|
||||||
|
|
||||||
import "sort"
|
import "sort"
|
||||||
|
|
||||||
type point struct {
|
type testPoint struct {
|
||||||
seriesKey string
|
seriesKey string
|
||||||
time int64
|
time int64
|
||||||
value interface{}
|
value interface{}
|
||||||
|
@ -18,7 +19,7 @@ type point struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type testIterator struct {
|
type testIterator struct {
|
||||||
values []point
|
values []testPoint
|
||||||
lastTags map[string]string
|
lastTags map[string]string
|
||||||
nextFunc func() (timestamp int64, value interface{})
|
nextFunc func() (timestamp int64, value interface{})
|
||||||
tagsFunc func() map[string]string
|
tagsFunc func() map[string]string
|
||||||
|
@ -63,17 +64,17 @@ func TestMapMeanNoValues(t *testing.T) {
|
||||||
func TestMapMean(t *testing.T) {
|
func TestMapMean(t *testing.T) {
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
input []point
|
input []testPoint
|
||||||
output *meanMapOutput
|
output *meanMapOutput
|
||||||
}{
|
}{
|
||||||
{ // Single point
|
{ // Single point
|
||||||
input: []point{point{"0", 1, 1.0, nil}},
|
input: []testPoint{testPoint{"0", 1, 1.0, nil}},
|
||||||
output: &meanMapOutput{1, 1, Float64Type},
|
output: &meanMapOutput{1, 1, Float64Type},
|
||||||
},
|
},
|
||||||
{ // Two points
|
{ // Two points
|
||||||
input: []point{
|
input: []testPoint{
|
||||||
point{"0", 1, 2.0, nil},
|
testPoint{"0", 1, 2.0, nil},
|
||||||
point{"0", 2, 8.0, nil},
|
testPoint{"0", 2, 8.0, nil},
|
||||||
},
|
},
|
||||||
output: &meanMapOutput{2, 5.0, Float64Type},
|
output: &meanMapOutput{2, 5.0, Float64Type},
|
||||||
},
|
},
|
||||||
|
@ -99,11 +100,11 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
||||||
|
|
||||||
for _, fn := range []string{"derivative", "non_negative_derivative"} {
|
for _, fn := range []string{"derivative", "non_negative_derivative"} {
|
||||||
// Single field arg should return MapEcho
|
// Single field arg should return MapEcho
|
||||||
c := &Call{
|
c := &influxql.Call{
|
||||||
Name: fn,
|
Name: fn,
|
||||||
Args: []Expr{
|
Args: []influxql.Expr{
|
||||||
&VarRef{Val: " field1"},
|
&influxql.VarRef{Val: " field1"},
|
||||||
&DurationLiteral{Val: time.Hour},
|
&influxql.DurationLiteral{Val: time.Hour},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,11 +114,11 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nested Aggregate func should return the map func for the nested aggregate
|
// Nested Aggregate func should return the map func for the nested aggregate
|
||||||
c = &Call{
|
c = &influxql.Call{
|
||||||
Name: fn,
|
Name: fn,
|
||||||
Args: []Expr{
|
Args: []influxql.Expr{
|
||||||
&Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}},
|
&influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}},
|
||||||
&DurationLiteral{Val: time.Hour},
|
&influxql.DurationLiteral{Val: time.Hour},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +136,7 @@ func TestReducePercentileNil(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReducePercentile should ignore nil values when calculating the percentile
|
// ReducePercentile should ignore nil values when calculating the percentile
|
||||||
got := ReducePercentile(input, &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 100}}})
|
got := ReducePercentile(input, &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 100}}})
|
||||||
if got != nil {
|
if got != nil {
|
||||||
t.Fatalf("ReducePercentile(100) returned wrong type. exp nil got %v", got)
|
t.Fatalf("ReducePercentile(100) returned wrong type. exp nil got %v", got)
|
||||||
}
|
}
|
||||||
|
@ -157,7 +158,7 @@ func TestMapDistinct(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
iter := &testIterator{
|
iter := &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{seriesKey1, timeId1, uint64(1), nil},
|
{seriesKey1, timeId1, uint64(1), nil},
|
||||||
{seriesKey1, timeId2, uint64(1), nil},
|
{seriesKey1, timeId2, uint64(1), nil},
|
||||||
{seriesKey1, timeId3, "1", nil},
|
{seriesKey1, timeId3, "1", nil},
|
||||||
|
@ -188,7 +189,7 @@ func TestMapDistinct(t *testing.T) {
|
||||||
|
|
||||||
func TestMapDistinctNil(t *testing.T) {
|
func TestMapDistinctNil(t *testing.T) {
|
||||||
iter := &testIterator{
|
iter := &testIterator{
|
||||||
values: []point{},
|
values: []testPoint{},
|
||||||
}
|
}
|
||||||
|
|
||||||
values := MapDistinct(iter)
|
values := MapDistinct(iter)
|
||||||
|
@ -311,7 +312,7 @@ func TestMapCountDistinct(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
iter := &testIterator{
|
iter := &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{seriesKey1, timeId1, uint64(1), nil},
|
{seriesKey1, timeId1, uint64(1), nil},
|
||||||
{seriesKey1, timeId2, uint64(1), nil},
|
{seriesKey1, timeId2, uint64(1), nil},
|
||||||
{seriesKey1, timeId3, "1", nil},
|
{seriesKey1, timeId3, "1", nil},
|
||||||
|
@ -342,7 +343,7 @@ func TestMapCountDistinct(t *testing.T) {
|
||||||
|
|
||||||
func TestMapCountDistinctNil(t *testing.T) {
|
func TestMapCountDistinctNil(t *testing.T) {
|
||||||
iter := &testIterator{
|
iter := &testIterator{
|
||||||
values: []point{},
|
values: []testPoint{},
|
||||||
}
|
}
|
||||||
|
|
||||||
values := MapCountDistinct(iter)
|
values := MapCountDistinct(iter)
|
||||||
|
@ -448,9 +449,9 @@ func TestGetSortedRange(t *testing.T) {
|
||||||
if len(results) != len(tt.expected) {
|
if len(results) != len(tt.expected) {
|
||||||
t.Errorf("Test %s error. Expected getSortedRange to return %v but got %v", tt.name, tt.expected, results)
|
t.Errorf("Test %s error. Expected getSortedRange to return %v but got %v", tt.name, tt.expected, results)
|
||||||
}
|
}
|
||||||
for i, point := range tt.expected {
|
for i, testPoint := range tt.expected {
|
||||||
if point != results[i] {
|
if testPoint != results[i] {
|
||||||
t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, point, results[i])
|
t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, testPoint, results[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -485,12 +486,12 @@ func TestMapTop(t *testing.T) {
|
||||||
skip bool
|
skip bool
|
||||||
iter *testIterator
|
iter *testIterator
|
||||||
exp positionOut
|
exp positionOut
|
||||||
call *Call
|
call *influxql.Call
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "int64 - basic",
|
name: "int64 - basic",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, int64(99), map[string]string{"host": "a"}},
|
{"", 10, int64(99), map[string]string{"host": "a"}},
|
||||||
{"", 10, int64(53), map[string]string{"host": "b"}},
|
{"", 10, int64(53), map[string]string{"host": "b"}},
|
||||||
{"", 20, int64(88), map[string]string{"host": "a"}},
|
{"", 20, int64(88), map[string]string{"host": "a"}},
|
||||||
|
@ -502,12 +503,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - basic with tag",
|
name: "int64 - basic with tag",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, int64(99), map[string]string{"host": "a"}},
|
{"", 10, int64(99), map[string]string{"host": "a"}},
|
||||||
{"", 20, int64(53), map[string]string{"host": "b"}},
|
{"", 20, int64(53), map[string]string{"host": "b"}},
|
||||||
{"", 30, int64(88), map[string]string{"host": "a"}},
|
{"", 30, int64(88), map[string]string{"host": "a"}},
|
||||||
|
@ -520,12 +521,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{20, int64(53), map[string]string{"host": "b"}},
|
PositionPoint{20, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - tie on value, resolve based on time",
|
name: "int64 - tie on value, resolve based on time",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 20, int64(99), map[string]string{"host": "a"}},
|
{"", 20, int64(99), map[string]string{"host": "a"}},
|
||||||
{"", 10, int64(53), map[string]string{"host": "a"}},
|
{"", 10, int64(53), map[string]string{"host": "a"}},
|
||||||
{"", 10, int64(99), map[string]string{"host": "a"}},
|
{"", 10, int64(99), map[string]string{"host": "a"}},
|
||||||
|
@ -538,12 +539,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - tie on value, time, resolve based on tags",
|
name: "int64 - tie on value, time, resolve based on tags",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, int64(99), map[string]string{"host": "b"}},
|
{"", 10, int64(99), map[string]string{"host": "b"}},
|
||||||
{"", 10, int64(99), map[string]string{"host": "a"}},
|
{"", 10, int64(99), map[string]string{"host": "a"}},
|
||||||
{"", 20, int64(88), map[string]string{"host": "a"}},
|
{"", 20, int64(88), map[string]string{"host": "a"}},
|
||||||
|
@ -556,12 +557,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(99), map[string]string{"host": "b"}},
|
PositionPoint{10, int64(99), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "mixed numerics - ints",
|
name: "mixed numerics - ints",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, int64(99), map[string]string{"host": "a"}},
|
{"", 10, int64(99), map[string]string{"host": "a"}},
|
||||||
{"", 10, int64(53), map[string]string{"host": "b"}},
|
{"", 10, int64(53), map[string]string{"host": "b"}},
|
||||||
{"", 20, uint64(88), map[string]string{"host": "a"}},
|
{"", 20, uint64(88), map[string]string{"host": "a"}},
|
||||||
|
@ -573,12 +574,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
|
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "mixed numerics - ints & floats",
|
name: "mixed numerics - ints & floats",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, float64(99), map[string]string{"host": "a"}},
|
{"", 10, float64(99), map[string]string{"host": "a"}},
|
||||||
{"", 10, int64(53), map[string]string{"host": "b"}},
|
{"", 10, int64(53), map[string]string{"host": "b"}},
|
||||||
{"", 20, uint64(88), map[string]string{"host": "a"}},
|
{"", 20, uint64(88), map[string]string{"host": "a"}},
|
||||||
|
@ -590,12 +591,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
|
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "mixed numerics - ints, floats, & strings",
|
name: "mixed numerics - ints, floats, & strings",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, float64(99), map[string]string{"host": "a"}},
|
{"", 10, float64(99), map[string]string{"host": "a"}},
|
||||||
{"", 10, int64(53), map[string]string{"host": "b"}},
|
{"", 10, int64(53), map[string]string{"host": "b"}},
|
||||||
{"", 20, "88", map[string]string{"host": "a"}},
|
{"", 20, "88", map[string]string{"host": "a"}},
|
||||||
|
@ -607,12 +608,12 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
|
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "bools",
|
name: "bools",
|
||||||
iter: &testIterator{
|
iter: &testIterator{
|
||||||
values: []point{
|
values: []testPoint{
|
||||||
{"", 10, true, map[string]string{"host": "a"}},
|
{"", 10, true, map[string]string{"host": "a"}},
|
||||||
{"", 10, true, map[string]string{"host": "b"}},
|
{"", 10, true, map[string]string{"host": "b"}},
|
||||||
{"", 20, false, map[string]string{"host": "a"}},
|
{"", 20, false, map[string]string{"host": "a"}},
|
||||||
|
@ -624,7 +625,7 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{10, true, map[string]string{"host": "b"}},
|
PositionPoint{10, true, map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -649,7 +650,7 @@ func TestReduceTop(t *testing.T) {
|
||||||
skip bool
|
skip bool
|
||||||
values []interface{}
|
values []interface{}
|
||||||
exp PositionPoints
|
exp PositionPoints
|
||||||
call *Call
|
call *influxql.Call
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "int64 - single map",
|
name: "int64 - single map",
|
||||||
|
@ -664,7 +665,7 @@ func TestReduceTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
||||||
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - double map",
|
name: "int64 - double map",
|
||||||
|
@ -681,7 +682,7 @@ func TestReduceTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
||||||
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - double map with nil",
|
name: "int64 - double map with nil",
|
||||||
|
@ -697,7 +698,7 @@ func TestReduceTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
||||||
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - double map with non-matching tags and tag selected",
|
name: "int64 - double map with non-matching tags and tag selected",
|
||||||
|
@ -713,7 +714,7 @@ func TestReduceTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
||||||
PositionPoint{20, int64(88), map[string]string{}},
|
PositionPoint{20, int64(88), map[string]string{}},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
skip: true,
|
skip: true,
|
||||||
|
@ -730,7 +731,7 @@ func TestReduceTop(t *testing.T) {
|
||||||
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
|
||||||
PositionPoint{20, int64(55), map[string]string{"host": "b"}},
|
PositionPoint{20, int64(55), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ type SelectMapper struct {
|
||||||
intervalSize int64 // Size of each interval.
|
intervalSize int64 // Size of each interval.
|
||||||
numIntervals int // Maximum number of intervals to return.
|
numIntervals int // Maximum number of intervals to return.
|
||||||
currInterval int // Current interval for which data is being fetched.
|
currInterval int // Current interval for which data is being fetched.
|
||||||
mapFuncs []influxql.MapFunc // The mapping functions.
|
mapFuncs []MapFunc // The mapping functions.
|
||||||
fieldNames []string // the field name being read for mapping.
|
fieldNames []string // the field name being read for mapping.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,10 +500,10 @@ func (lm *SelectMapper) initializeMapFunctions() error {
|
||||||
var err error
|
var err error
|
||||||
// Set up each mapping function for this statement.
|
// Set up each mapping function for this statement.
|
||||||
aggregates := lm.selectStmt.FunctionCalls()
|
aggregates := lm.selectStmt.FunctionCalls()
|
||||||
lm.mapFuncs = make([]influxql.MapFunc, len(aggregates))
|
lm.mapFuncs = make([]MapFunc, len(aggregates))
|
||||||
lm.fieldNames = make([]string, len(lm.mapFuncs))
|
lm.fieldNames = make([]string, len(lm.mapFuncs))
|
||||||
for i, c := range aggregates {
|
for i, c := range aggregates {
|
||||||
lm.mapFuncs[i], err = influxql.InitializeMapFunc(c)
|
lm.mapFuncs[i], err = InitializeMapFunc(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -289,7 +289,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt
|
||||||
|
|
||||||
switch lit := nested.Args[0].(type) {
|
switch lit := nested.Args[0].(type) {
|
||||||
case *influxql.VarRef:
|
case *influxql.VarRef:
|
||||||
if influxql.IsNumeric(nested) {
|
if IsNumeric(nested) {
|
||||||
f := m.Fields[lit.Val]
|
f := m.Fields[lit.Val]
|
||||||
if err := validateType(a.Name, f.Name, f.Type); err != nil {
|
if err := validateType(a.Name, f.Name, f.Type); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -299,7 +299,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt
|
||||||
if nested.Name != "count" {
|
if nested.Name != "count" {
|
||||||
return fmt.Errorf("aggregate call didn't contain a field %s", a.String())
|
return fmt.Errorf("aggregate call didn't contain a field %s", a.String())
|
||||||
}
|
}
|
||||||
if influxql.IsNumeric(nested) {
|
if IsNumeric(nested) {
|
||||||
f := m.Fields[lit.Val]
|
f := m.Fields[lit.Val]
|
||||||
if err := validateType(a.Name, f.Name, f.Type); err != nil {
|
if err := validateType(a.Name, f.Name, f.Type); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue