parent
a308259153
commit
a05e2b164e
|
@ -21,6 +21,7 @@
|
|||
- [#6483](https://github.com/influxdata/influxdb/pull/6483): Delete series support for TSM
|
||||
- [#6484](https://github.com/influxdata/influxdb/pull/6484): Query language support for DELETE
|
||||
- [#6290](https://github.com/influxdata/influxdb/issues/6290): Add POST /query endpoint and warning messages for using GET with write operations.
|
||||
- [#6494](https://github.com/influxdata/influxdb/issues/6494): Support booleans for min() and max().
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -137,6 +137,12 @@ func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
|
||||
case BooleanIterator:
|
||||
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
||||
fn := NewBooleanFuncReducer(BooleanMinReduce)
|
||||
return fn, fn
|
||||
}
|
||||
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported min iterator type: %T", input)
|
||||
}
|
||||
|
@ -158,6 +164,14 @@ func IntegerMinReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
|||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// BooleanMinReduce returns the minimum value between prev & curr.
|
||||
func BooleanMinReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
||||
if prev == nil || (curr.Value != prev.Value && !curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
||||
return curr.Time, curr.Value, curr.Aux
|
||||
}
|
||||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// newMaxIterator returns an iterator for operating on a max() call.
|
||||
func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
|
@ -173,6 +187,12 @@ func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
|
||||
case BooleanIterator:
|
||||
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
||||
fn := NewBooleanFuncReducer(BooleanMaxReduce)
|
||||
return fn, fn
|
||||
}
|
||||
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported max iterator type: %T", input)
|
||||
}
|
||||
|
@ -194,6 +214,14 @@ func IntegerMaxReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
|||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// BooleanMaxReduce returns the minimum value between prev & curr.
|
||||
func BooleanMaxReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
||||
if prev == nil || (curr.Value != prev.Value && curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
||||
return curr.Time, curr.Value, curr.Aux
|
||||
}
|
||||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// newSumIterator returns an iterator for operating on a sum() call.
|
||||
func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
|
|
|
@ -211,6 +211,38 @@ func TestCallIterator_Min_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a boolean iterator can be created for a min() call.
|
||||
func TestCallIterator_Min_Boolean(t *testing.T) {
|
||||
itr, _ := influxql.NewCallIterator(
|
||||
&BooleanIterator{Points: []influxql.BooleanPoint{
|
||||
{Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
|
||||
{Time: 5, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 23, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
influxql.IteratorOptions{
|
||||
Expr: MustParseExpr(`min("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.BooleanPoint{Time: 2, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&influxql.BooleanPoint{Time: 1, Value: false, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&influxql.BooleanPoint{Time: 23, Value: true, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a float iterator can be created for a max() call.
|
||||
func TestCallIterator_Max_Float(t *testing.T) {
|
||||
itr, _ := influxql.NewCallIterator(
|
||||
|
@ -275,6 +307,38 @@ func TestCallIterator_Max_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a boolean iterator can be created for a max() call.
|
||||
func TestCallIterator_Max_Boolean(t *testing.T) {
|
||||
itr, _ := influxql.NewCallIterator(
|
||||
&BooleanIterator{Points: []influxql.BooleanPoint{
|
||||
{Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 2, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
|
||||
{Time: 5, Value: false, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 23, Value: true, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
influxql.IteratorOptions{
|
||||
Expr: MustParseExpr(`max("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: influxql.Interval{Duration: 5 * time.Nanosecond},
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&influxql.BooleanPoint{Time: 1, Value: false, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&influxql.BooleanPoint{Time: 23, Value: true, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a float iterator can be created for a sum() call.
|
||||
func TestCallIterator_Sum_Float(t *testing.T) {
|
||||
itr, _ := influxql.NewCallIterator(
|
||||
|
|
Loading…
Reference in New Issue