introduce a new function non_negative_difference

pull/8235/head
zhexuany 2017-03-31 10:26:14 +08:00
parent a221e32291
commit 232fdae6dd
7 changed files with 212 additions and 19 deletions

View File

@ -13,6 +13,7 @@
- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL.
- [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries.
- [#8194](https://github.com/influxdata/influxdb/pull/8194): Add "integral" function to InfluxQL.
- [#7393](https://github.com/influxdata/influxdb/issues/7393): Add "non_negative_difference" function to InfluxQL.
### Bugfixes

View File

@ -1926,7 +1926,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields {
for _, expr := range walkFunctionCalls(f.Expr) {
switch expr.Name {
case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed":
case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "cumulative_sum", "elapsed":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
@ -1942,7 +1942,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return fmt.Errorf("second argument to %s must be a duration, got %T", expr.Name, expr.Args[1])
}
}
case "difference", "cumulative_sum":
case "difference", "non_negative_difference", "cumulative_sum":
if got := len(expr.Args); got != 1 {
return fmt.Errorf("invalid number of arguments for %s, expected 1, got %d", expr.Name, got)
}
@ -2162,7 +2162,7 @@ func (s *SelectStatement) validateGroupByInterval() error {
switch expr := f.Expr.(type) {
case *Call:
switch expr.Name {
case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed", "holt_winters", "holt_winters_with_fit":
case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "cumulative_sum", "elapsed", "holt_winters", "holt_winters_with_fit":
// If the first argument is a call, we needed a group by interval and we don't have one.
if _, ok := expr.Args[0].(*Call); ok {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name)

View File

@ -1142,17 +1142,17 @@ func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interva
}
// newDifferenceIterator returns an iterator for operating on a difference() call.
func newDifferenceIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
func newDifferenceIterator(input Iterator, opt IteratorOptions, isNonNegative bool) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatDifferenceReducer()
fn := NewFloatDifferenceReducer(isNonNegative)
return fn, fn
}
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerDifferenceReducer()
fn := NewIntegerDifferenceReducer(isNonNegative)
return fn, fn
}
return newIntegerStreamIntegerIterator(input, createFn, opt), nil

View File

@ -185,15 +185,17 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
// FloatDifferenceReducer calculates the derivative of the aggregated points.
type FloatDifferenceReducer struct {
prev FloatPoint
curr FloatPoint
isNonNegative bool
prev FloatPoint
curr FloatPoint
}
// NewFloatDifferenceReducer creates a new FloatDifferenceReducer.
func NewFloatDifferenceReducer() *FloatDifferenceReducer {
func NewFloatDifferenceReducer(isNonNegative bool) *FloatDifferenceReducer {
return &FloatDifferenceReducer{
prev: FloatPoint{Nil: true},
curr: FloatPoint{Nil: true},
isNonNegative: isNonNegative,
prev: FloatPoint{Nil: true},
curr: FloatPoint{Nil: true},
}
}
@ -216,6 +218,12 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value
// If it is non_negative_difference discard any negative value. Since
// prev is still marked as unread. The correctness can be ensured.
if r.isNonNegative && value < 0 {
return nil
}
// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []FloatPoint{{Time: r.curr.Time, Value: value}}
@ -225,15 +233,17 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint {
// IntegerDifferenceReducer calculates the derivative of the aggregated points.
type IntegerDifferenceReducer struct {
prev IntegerPoint
curr IntegerPoint
isNonNegative bool
prev IntegerPoint
curr IntegerPoint
}
// NewIntegerDifferenceReducer creates a new IntegerDifferenceReducer.
func NewIntegerDifferenceReducer() *IntegerDifferenceReducer {
func NewIntegerDifferenceReducer(isNonNegative bool) *IntegerDifferenceReducer {
return &IntegerDifferenceReducer{
prev: IntegerPoint{Nil: true},
curr: IntegerPoint{Nil: true},
isNonNegative: isNonNegative,
prev: IntegerPoint{Nil: true},
curr: IntegerPoint{Nil: true},
}
}
@ -256,8 +266,15 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value
// If it is non_negative_difference discard any negative value. Since
// prev is still marked as unread. The correctness can be ensured.
if r.isNonNegative && value < 0 {
return nil
}
// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []IntegerPoint{{Time: r.curr.Time, Value: value}}
}
return nil

View File

@ -314,6 +314,56 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// non_negative_difference
{
s: `SELECT non_negative_difference(field1) FROM myseries;`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "non_negative_difference", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
},
},
{
s: fmt.Sprintf(`SELECT non_negative_difference(max(field1)) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{
Expr: &influxql.Call{
Name: "non_negative_difference",
Args: []influxql.Expr{
&influxql.Call{
Name: "max",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
},
},
},
},
},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: time.Minute},
},
},
},
},
Condition: &influxql.BinaryExpr{
Op: influxql.GT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.StringLiteral{Val: now.UTC().Format(time.RFC3339Nano)},
},
},
},
// moving_average
{
s: `SELECT moving_average(field1, 3) FROM myseries;`,

View File

@ -748,7 +748,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
opt.Interval = Interval{}
return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeFitData, interval)
case "derivative", "non_negative_derivative", "difference", "moving_average", "elapsed":
case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "elapsed":
opt := b.opt
if !opt.Interval.IsZero() {
if opt.Ascending {
@ -772,8 +772,9 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
case "elapsed":
interval := opt.ElapsedInterval()
return newElapsedIterator(input, opt, interval)
case "difference":
return newDifferenceIterator(input, opt)
case "difference", "non_negative_difference":
isNonNegative := (expr.Name == "non_negative_difference")
return newDifferenceIterator(input, opt, isNonNegative)
case "moving_average":
n := expr.Args[1].(*IntegerLiteral)
if n.Val > 1 && !b.opt.Interval.IsZero() {

View File

@ -2814,6 +2814,130 @@ func TestSelect_Difference_Duplicate_Integer(t *testing.T) {
}
}
func TestSelect_Non_Negative_Difference_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 29},
{Name: "cpu", Time: 12 * Second, Value: 3},
{Name: "cpu", Time: 16 * Second, Value: 39},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 19}},
{&influxql.FloatPoint{Name: "cpu", Time: 16 * Second, Value: 36}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Non_Negative_Difference_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 21},
{Name: "cpu", Time: 12 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 8 * Second, Value: 11}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Non_Negative_Difference_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
{Name: "cpu", Time: 8 * Second, Value: 30},
{Name: "cpu", Time: 8 * Second, Value: 19},
{Name: "cpu", Time: 12 * Second, Value: 10},
{Name: "cpu", Time: 12 * Second, Value: 3},
{Name: "cpu", Time: 16 * Second, Value: 40},
{Name: "cpu", Time: 16 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 20}},
{&influxql.FloatPoint{Name: "cpu", Time: 16 * Second, Value: 30}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Non_Negative_Difference_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if m.Name != "cpu" {
t.Fatalf("unexpected source: %s", m.Name)
}
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
{Name: "cpu", Time: 8 * Second, Value: 30},
{Name: "cpu", Time: 8 * Second, Value: 19},
{Name: "cpu", Time: 12 * Second, Value: 10},
{Name: "cpu", Time: 12 * Second, Value: 3},
{Name: "cpu", Time: 16 * Second, Value: 40},
{Name: "cpu", Time: 16 * Second, Value: 3},
}}, nil
}
// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 8 * Second, Value: 20}},
{&influxql.IntegerPoint{Name: "cpu", Time: 16 * Second, Value: 30}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestSelect_Elapsed_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) {