Merge pull request #9791 from influxdata/js-spread-stream-function

Optimize the spread function to process points iteratively instead of in batch
pull/9797/head
Jonathan A. Sternberg 2018-05-01 15:08:34 -05:00 committed by GitHub
commit 10ed277e7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 45 deletions

View File

@ -934,19 +934,19 @@ func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(FloatSpreadReduceSlice)
fn := NewFloatSpreadReducer()
return fn, fn
}
return newFloatReduceFloatIterator(input, opt, createFn), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(IntegerSpreadReduceSlice)
fn := NewIntegerSpreadReducer()
return fn, fn
}
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
case UnsignedIterator:
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
fn := NewUnsignedSliceFuncReducer(UnsignedSpreadReduceSlice)
fn := NewUnsignedSpreadReducer()
return fn, fn
}
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
@ -955,47 +955,6 @@ func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
}
}
// FloatSpreadReduceSlice returns the spread value within a window.
func FloatSpreadReduceSlice(a []FloatPoint) []FloatPoint {
// Find min & max values.
min, max := a[0].Value, a[0].Value
for _, p := range a[1:] {
min = math.Min(min, p.Value)
max = math.Max(max, p.Value)
}
return []FloatPoint{{Time: ZeroTime, Value: max - min}}
}
// IntegerSpreadReduceSlice returns the spread value within a window.
func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint {
// Find min & max values.
min, max := a[0].Value, a[0].Value
for _, p := range a[1:] {
if p.Value < min {
min = p.Value
}
if p.Value > max {
max = p.Value
}
}
return []IntegerPoint{{Time: ZeroTime, Value: max - min}}
}
// UnsignedSpreadReduceSlice returns the spread value within a window.
func UnsignedSpreadReduceSlice(a []UnsignedPoint) []UnsignedPoint {
// Find min & max values.
min, max := a[0].Value, a[0].Value
for _, p := range a[1:] {
if p.Value < min {
min = p.Value
}
if p.Value > max {
max = p.Value
}
}
return []UnsignedPoint{{Time: ZeroTime, Value: max - min}}
}
func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:

View File

@ -176,6 +176,92 @@ func (r *UnsignedMeanReducer) Emit() []FloatPoint {
}}
}
type FloatSpreadReducer struct {
min, max float64
count uint32
}
func NewFloatSpreadReducer() *FloatSpreadReducer {
return &FloatSpreadReducer{
min: math.Inf(1),
max: math.Inf(-1),
}
}
func (r *FloatSpreadReducer) AggregateFloat(p *FloatPoint) {
r.min = math.Min(r.min, p.Value)
r.max = math.Max(r.max, p.Value)
r.count++
}
func (r *FloatSpreadReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
Value: r.max - r.min,
Aggregated: r.count,
}}
}
type IntegerSpreadReducer struct {
min, max int64
count uint32
}
func NewIntegerSpreadReducer() *IntegerSpreadReducer {
return &IntegerSpreadReducer{
min: math.MaxInt64,
max: math.MinInt64,
}
}
func (r *IntegerSpreadReducer) AggregateInteger(p *IntegerPoint) {
if p.Value < r.min {
r.min = p.Value
}
if p.Value > r.max {
r.max = p.Value
}
r.count++
}
func (r *IntegerSpreadReducer) Emit() []IntegerPoint {
return []IntegerPoint{{
Time: ZeroTime,
Value: r.max - r.min,
Aggregated: r.count,
}}
}
type UnsignedSpreadReducer struct {
min, max uint64
count uint32
}
func NewUnsignedSpreadReducer() *UnsignedSpreadReducer {
return &UnsignedSpreadReducer{
min: math.MaxUint64,
max: 0,
}
}
func (r *UnsignedSpreadReducer) AggregateUnsigned(p *UnsignedPoint) {
if p.Value < r.min {
r.min = p.Value
}
if p.Value > r.max {
r.max = p.Value
}
r.count++
}
func (r *UnsignedSpreadReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{{
Time: ZeroTime,
Value: r.max - r.min,
Aggregated: r.count,
}}
}
// FloatDerivativeReducer calculates the derivative of the aggregated points.
type FloatDerivativeReducer struct {
interval Interval

View File

@ -650,7 +650,10 @@ func (m *measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Ex
}
} else if n.Op == influxql.NEQ {
if str.Val != "" {
ids = m.SeriesIDs().Reject(tagVals.Load(str.Val))
ids = m.SeriesIDs()
if vals := tagVals.Load(str.Val); len(vals) > 0 {
ids = ids.Reject(vals)
}
} else {
tagVals.RangeAll(func(_ string, a seriesIDs) {
ids = append(ids, a...)