parent
a0a4600e7f
commit
98521b273e
|
@ -646,7 +646,7 @@ type SelectStatement struct {
|
||||||
// derivative aggregate
|
// derivative aggregate
|
||||||
func (s *SelectStatement) HasDerivative() bool {
|
func (s *SelectStatement) HasDerivative() bool {
|
||||||
for _, f := range s.Fields {
|
for _, f := range s.Fields {
|
||||||
if f.Name() == "derivative" {
|
if strings.HasSuffix(f.Name(), "derivative") {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -657,7 +657,7 @@ func (s *SelectStatement) HasDerivative() bool {
|
||||||
// variable ref as the first arg
|
// variable ref as the first arg
|
||||||
func (s *SelectStatement) IsSimpleDerivative() bool {
|
func (s *SelectStatement) IsSimpleDerivative() bool {
|
||||||
for _, f := range s.Fields {
|
for _, f := range s.Fields {
|
||||||
if f.Name() == "derivative" {
|
if strings.HasSuffix(f.Name(), "derivative") {
|
||||||
// cast to derivative call
|
// cast to derivative call
|
||||||
if d, ok := f.Expr.(*Call); ok {
|
if d, ok := f.Expr.(*Call); ok {
|
||||||
|
|
||||||
|
|
|
@ -364,6 +364,10 @@ func (m *MapReduceJob) derivativeInterval() time.Duration {
|
||||||
return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val
|
return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MapReduceJob) isNonNegativeDerivative() bool {
|
||||||
|
return m.stmt.FunctionCalls()[0].Name == "non_negative_derivative"
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput {
|
func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput {
|
||||||
// If we're called and do not have a derivative aggregate function, then return what was passed in
|
// If we're called and do not have a derivative aggregate function, then return what was passed in
|
||||||
if !m.stmt.HasDerivative() {
|
if !m.stmt.HasDerivative() {
|
||||||
|
@ -389,24 +393,31 @@ func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *raw
|
||||||
lastValueFromPreviousChunk = valuesToReturn[0]
|
lastValueFromPreviousChunk = valuesToReturn[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
// The duration to normalize the derivative by. This is so the derivative values
|
// Determines whether to drop negative differences
|
||||||
// can be expressed as "per second", etc.. within each time segment
|
isNonNegative := m.isNonNegativeDerivative()
|
||||||
interval := m.derivativeInterval()
|
|
||||||
|
|
||||||
derivativeValues := make([]*rawQueryMapOutput, len(valuesToReturn)-1)
|
derivativeValues := []*rawQueryMapOutput{}
|
||||||
for i := 1; i < len(valuesToReturn); i++ {
|
for i := 1; i < len(valuesToReturn); i++ {
|
||||||
v := valuesToReturn[i]
|
v := valuesToReturn[i]
|
||||||
|
|
||||||
// Calculate the derivate of successive points by dividing the difference
|
// Calculate the derivative of successive points by dividing the difference
|
||||||
// of each value by the elapsed time normalized to the interval
|
// of each value by the elapsed time normalized to the interval
|
||||||
|
var value interface{}
|
||||||
diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64)
|
diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64)
|
||||||
elapsed := v.Time - lastValueFromPreviousChunk.Time
|
elapsed := v.Time - lastValueFromPreviousChunk.Time
|
||||||
|
value = diff / (float64(elapsed) / float64(m.derivativeInterval()))
|
||||||
|
|
||||||
derivativeValues[i-1] = &rawQueryMapOutput{
|
|
||||||
Time: v.Time,
|
|
||||||
Values: diff / (float64(elapsed) / float64(interval)),
|
|
||||||
}
|
|
||||||
lastValueFromPreviousChunk = v
|
lastValueFromPreviousChunk = v
|
||||||
|
|
||||||
|
// Drop negative values for non-negative derivatives
|
||||||
|
if isNonNegative && diff < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
derivativeValues = append(derivativeValues, &rawQueryMapOutput{
|
||||||
|
Time: v.Time,
|
||||||
|
Values: value,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return derivativeValues
|
return derivativeValues
|
||||||
|
@ -433,27 +444,36 @@ func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Determines whether to drop negative differences
|
||||||
|
isNonNegative := m.isNonNegativeDerivative()
|
||||||
|
|
||||||
// Otherwise calculate the derivatives as the difference between consequtive
|
// Otherwise calculate the derivatives as the difference between consequtive
|
||||||
// points divided by the elapsed time. Then normalize to the requested
|
// points divided by the elapsed time. Then normalize to the requested
|
||||||
// interval.
|
// interval.
|
||||||
derivatives := make([][]interface{}, len(results)-1)
|
derivatives := [][]interface{}{}
|
||||||
for i := 1; i < len(results); i++ {
|
for i := 1; i < len(results); i++ {
|
||||||
prev := results[i-1]
|
prev := results[i-1]
|
||||||
cur := results[i]
|
cur := results[i]
|
||||||
|
|
||||||
if cur[1] == nil || prev[1] == nil {
|
if cur[1] == nil || prev[1] == nil {
|
||||||
derivatives[i-1] = cur
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var value interface{}
|
||||||
elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time))
|
elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time))
|
||||||
diff := cur[1].(float64) - prev[1].(float64)
|
diff := cur[1].(float64) - prev[1].(float64)
|
||||||
|
value = float64(diff) / (float64(elapsed) / float64(m.derivativeInterval()))
|
||||||
|
|
||||||
|
// Drop negative values for non-negative derivatives
|
||||||
|
if isNonNegative && diff < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
val := []interface{}{
|
val := []interface{}{
|
||||||
cur[0],
|
cur[0],
|
||||||
float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())),
|
value,
|
||||||
}
|
}
|
||||||
derivatives[i-1] = val
|
derivatives = append(derivatives, val)
|
||||||
}
|
}
|
||||||
|
|
||||||
return derivatives
|
return derivatives
|
||||||
|
|
|
@ -0,0 +1,383 @@
|
||||||
|
package influxql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func derivativeJob(t *testing.T, fn, interval string) *MapReduceJob {
|
||||||
|
q, err := ParseQuery(fmt.Sprintf("SELECT %s(mean(value), %s) FROM foo", fn, interval))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to parse query: %s", err)
|
||||||
|
}
|
||||||
|
m := &MapReduceJob{
|
||||||
|
stmt: q.Statements[0].(*SelectStatement),
|
||||||
|
}
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessDerivative(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fn string
|
||||||
|
interval string
|
||||||
|
in [][]interface{}
|
||||||
|
exp [][]interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty input",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: [][]interface{}{},
|
||||||
|
exp: [][]interface{}{},
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
name: "single row returns 0.0",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0), 1.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0), 0.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "basic derivative",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0), 1.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 3.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), 5.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 9.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), 2.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "12h interval",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "12h",
|
||||||
|
in: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0), 1.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), 3.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 0.5,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), 0.5,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 0.5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "negative derivatives",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0), 1.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), 0.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 1.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), -2.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "negative derivatives",
|
||||||
|
fn: "non_negative_derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0), 1.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 2.0,
|
||||||
|
},
|
||||||
|
// Show resultes in negative derivative
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(48 * time.Hour), 0.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: [][]interface{}{
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(24 * time.Hour), 1.0,
|
||||||
|
},
|
||||||
|
[]interface{}{
|
||||||
|
time.Unix(0, 0).Add(72 * time.Hour), 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
m := derivativeJob(t, test.fn, test.interval)
|
||||||
|
got := m.processDerivative(test.in)
|
||||||
|
|
||||||
|
if len(got) != len(test.exp) {
|
||||||
|
t.Fatalf("processDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(test.exp); i++ {
|
||||||
|
if test.exp[i][0] != got[i][0] || test.exp[i][1] != got[i][1] {
|
||||||
|
t.Fatalf("processDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessRawQueryDerivative(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fn string
|
||||||
|
interval string
|
||||||
|
in []*rawQueryMapOutput
|
||||||
|
exp []*rawQueryMapOutput
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty input",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: []*rawQueryMapOutput{},
|
||||||
|
exp: []*rawQueryMapOutput{},
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
name: "single row returns 0.0",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Unix(),
|
||||||
|
Values: 1.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Unix(),
|
||||||
|
Values: 0.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "basic derivative",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Unix(),
|
||||||
|
Values: 0.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 3.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: 5.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 9.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 2.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: 2.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "12h interval",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "12h",
|
||||||
|
in: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).UnixNano(),
|
||||||
|
Values: 1.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 2.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: 3.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 0.5,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: 0.5,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 0.5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "negative derivatives",
|
||||||
|
fn: "derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Unix(),
|
||||||
|
Values: 1.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 2.0,
|
||||||
|
},
|
||||||
|
// should go negative
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: 0.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 1.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: -2.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "negative derivatives",
|
||||||
|
fn: "non_negative_derivative",
|
||||||
|
interval: "1d",
|
||||||
|
in: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Unix(),
|
||||||
|
Values: 1.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 2.0,
|
||||||
|
},
|
||||||
|
// should go negative
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
|
||||||
|
Values: 0.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []*rawQueryMapOutput{
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
|
||||||
|
Values: 1.0,
|
||||||
|
},
|
||||||
|
&rawQueryMapOutput{
|
||||||
|
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
|
||||||
|
Values: 4.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
m := derivativeJob(t, test.fn, test.interval)
|
||||||
|
got := m.processRawQueryDerivative(nil, test.in)
|
||||||
|
|
||||||
|
if len(got) != len(test.exp) {
|
||||||
|
t.Fatalf("processRawQueryDerivative(%s) - %s\nlen mismatch: got %d, exp %d", test.fn, test.name, len(got), len(test.exp))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(test.exp); i++ {
|
||||||
|
if test.exp[i].Time != got[i].Time || (test.exp[i].Values.(float64)-got[i].Values.(float64)) > 0.000000001 {
|
||||||
|
t.Fatalf("processRawQueryDerivative - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Iterator represents a forward-only iterator over a set of points.
|
// Iterator represents a forward-only iterator over a set of points.
|
||||||
|
@ -39,7 +40,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that there is either a single argument or if for percentile, two
|
// Ensure that there is either a single argument or if for percentile, two
|
||||||
if c.Name == "percentile" || c.Name == "derivative" {
|
if c.Name == "percentile" || strings.HasSuffix(c.Name, "derivative") {
|
||||||
if len(c.Args) != 2 {
|
if len(c.Args) != 2 {
|
||||||
return nil, fmt.Errorf("expected two arguments for %s()", c.Name)
|
return nil, fmt.Errorf("expected two arguments for %s()", c.Name)
|
||||||
}
|
}
|
||||||
|
@ -49,7 +50,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
|
||||||
|
|
||||||
// derivative can take a nested aggregate function, everything else expects
|
// derivative can take a nested aggregate function, everything else expects
|
||||||
// a variable reference as the first arg
|
// a variable reference as the first arg
|
||||||
if c.Name != "derivative" {
|
if !strings.HasSuffix(c.Name, "derivative") {
|
||||||
// Ensure the argument is a variable reference.
|
// Ensure the argument is a variable reference.
|
||||||
_, ok := c.Args[0].(*VarRef)
|
_, ok := c.Args[0].(*VarRef)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -85,7 +86,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
|
||||||
return nil, fmt.Errorf("expected float argument in percentile()")
|
return nil, fmt.Errorf("expected float argument in percentile()")
|
||||||
}
|
}
|
||||||
return MapEcho, nil
|
return MapEcho, nil
|
||||||
case "derivative":
|
case "derivative", "non_negative_derivative":
|
||||||
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
||||||
// use the map func for that nested aggregate
|
// use the map func for that nested aggregate
|
||||||
if fn, ok := c.Args[0].(*Call); ok {
|
if fn, ok := c.Args[0].(*Call); ok {
|
||||||
|
@ -131,13 +132,13 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
|
||||||
return nil, fmt.Errorf("expected float argument in percentile()")
|
return nil, fmt.Errorf("expected float argument in percentile()")
|
||||||
}
|
}
|
||||||
return ReducePercentile(lit.Val), nil
|
return ReducePercentile(lit.Val), nil
|
||||||
case "derivative":
|
case "derivative", "non_negative_derivative":
|
||||||
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
||||||
// use the map func for that nested aggregate
|
// use the map func for that nested aggregate
|
||||||
if fn, ok := c.Args[0].(*Call); ok {
|
if fn, ok := c.Args[0].(*Call); ok {
|
||||||
return InitializeReduceFunc(fn)
|
return InitializeReduceFunc(fn)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("expected function argument to derivative: %q", c.Name)
|
return nil, fmt.Errorf("expected function argument to %s", c.Name)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("function not found: %q", c.Name)
|
return nil, fmt.Errorf("function not found: %q", c.Name)
|
||||||
}
|
}
|
||||||
|
@ -792,6 +793,10 @@ type rawQueryMapOutput struct {
|
||||||
Values interface{}
|
Values interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *rawQueryMapOutput) String() string {
|
||||||
|
return fmt.Sprintf("{%#v %#v}", r.Time, r.Values)
|
||||||
|
}
|
||||||
|
|
||||||
type rawOutputs []*rawQueryMapOutput
|
type rawOutputs []*rawQueryMapOutput
|
||||||
|
|
||||||
func (a rawOutputs) Len() int { return len(a) }
|
func (a rawOutputs) Len() int { return len(a) }
|
||||||
|
|
|
@ -104,9 +104,11 @@ func TestInitializeMapFuncPercentile(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInitializeMapFuncDerivative(t *testing.T) {
|
func TestInitializeMapFuncDerivative(t *testing.T) {
|
||||||
|
|
||||||
|
for _, fn := range []string{"derivative", "non_negative_derivative"} {
|
||||||
// No args should fail
|
// No args should fail
|
||||||
c := &Call{
|
c := &Call{
|
||||||
Name: "derivative",
|
Name: fn,
|
||||||
Args: []Expr{},
|
Args: []Expr{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +119,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
||||||
|
|
||||||
// Single field arg should return MapEcho
|
// Single field arg should return MapEcho
|
||||||
c = &Call{
|
c = &Call{
|
||||||
Name: "derivative",
|
Name: fn,
|
||||||
Args: []Expr{
|
Args: []Expr{
|
||||||
&VarRef{Val: " field1"},
|
&VarRef{Val: " field1"},
|
||||||
&DurationLiteral{Val: time.Hour},
|
&DurationLiteral{Val: time.Hour},
|
||||||
|
@ -131,7 +133,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
||||||
|
|
||||||
// Nested Aggregate func should return the map func for the nested aggregate
|
// Nested Aggregate func should return the map func for the nested aggregate
|
||||||
c = &Call{
|
c = &Call{
|
||||||
Name: "derivative",
|
Name: fn,
|
||||||
Args: []Expr{
|
Args: []Expr{
|
||||||
&Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}},
|
&Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}},
|
||||||
&DurationLiteral{Val: time.Hour},
|
&DurationLiteral{Val: time.Hour},
|
||||||
|
@ -142,6 +144,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
|
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInitializeReduceFuncPercentile(t *testing.T) {
|
func TestInitializeReduceFuncPercentile(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue