diff --git a/CHANGELOG.md b/CHANGELOG.md index cdecc57ee3..00c45beb29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,8 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21972](https://github.com/influxdata/influxdb/pull/21972): Added support for notebooks and annotations. 1. [22135](https://github.com/influxdata/influxdb/pull/22135): Added route to return known resources. 1. [22311](https://github.com/influxdata/influxdb/pull/22311): Add `storage-no-validate-field-size` config to `influxd` to disable enforcement of max field size. +1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data. +1. [22322](https://github.com/influxdata/influxdb/pull/22322): Add support for `merge_hll`, `sum_hll`, and `count_hll` in InfluxQL. ### Bug Fixes @@ -75,7 +77,6 @@ Because of the version bump to `go`, the macOS build for this release requires a 1. [21910](https://github.com/influxdata/influxdb/pull/21910): Added `--ui-disabled` option to `influxd` to allow for running with the UI disabled. 1. [21958](https://github.com/influxdata/influxdb/pull/21958): Telemetry improvements: Do not record telemetry data for non-existant paths; replace invalid static asset paths with a slug. 1. [22023](https://github.com/influxdata/influxdb/pull/22023): Upgrade Flux to v0.124.0. -1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data. ### Bug Fixes diff --git a/influxql/query/call_iterator.go b/influxql/query/call_iterator.go index 680b9316d7..936d0886dd 100644 --- a/influxql/query/call_iterator.go +++ b/influxql/query/call_iterator.go @@ -53,6 +53,10 @@ func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return newLastIterator(input, opt) case "mean": return newMeanIterator(input, opt) + case "sum_hll": + return NewSumHllIterator(input, opt) + case "merge_hll": + return NewMergeHllIterator(input, opt) default: return nil, fmt.Errorf("unsupported function call: %s", name) } @@ -1529,3 +1533,68 @@ func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) return nil, fmt.Errorf("unsupported integral iterator type: %T", input) } } + +// NewSumHllIterator returns an iterator for operating on a distinct() call. +func NewSumHllIterator(input Iterator, opt IteratorOptions) (Iterator, error) { + switch input := input.(type) { + case FloatIterator: + createFn := func() (FloatPointAggregator, StringPointEmitter) { + fn := NewFloatSumHllReducer() + return fn, fn + } + return newFloatReduceStringIterator(input, opt, createFn), nil + case IntegerIterator: + createFn := func() (IntegerPointAggregator, StringPointEmitter) { + fn := NewIntegerSumHllReducer() + return fn, fn + } + return newIntegerReduceStringIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, StringPointEmitter) { + fn := NewUnsignedSumHllReducer() + return fn, fn + } + return newUnsignedReduceStringIterator(input, opt, createFn), nil + case StringIterator: + createFn := func() (StringPointAggregator, StringPointEmitter) { + fn := NewStringSumHllReducer() + return fn, fn + } + return newStringReduceStringIterator(input, opt, createFn), nil + case BooleanIterator: + createFn := func() (BooleanPointAggregator, StringPointEmitter) { + fn := NewBooleanSumHllReducer() + return fn, fn + } + return newBooleanReduceStringIterator(input, opt, createFn), nil + default: + return nil, fmt.Errorf("unsupported sum_hll iterator type: %T", input) + } +} + +// NewSumHllIterator returns an iterator for operating on a distinct() call. +func NewMergeHllIterator(input Iterator, opt IteratorOptions) (Iterator, error) { + switch input := input.(type) { + case StringIterator: + createFn := func() (StringPointAggregator, StringPointEmitter) { + fn := NewStringMergeHllReducer() + return fn, fn + } + return newStringReduceStringIterator(input, opt, createFn), nil + default: + return nil, fmt.Errorf("unsupported merge_hll iterator type: %T", input) + } +} + +func NewCountHllIterator(input Iterator, opt IteratorOptions) (Iterator, error) { + switch input := input.(type) { + case StringIterator: + createFn := func() (StringPointAggregator, UnsignedPointEmitter) { + fn := NewCountHllReducer() + return fn, fn + } + return newStringStreamUnsignedIterator(input, createFn, opt), nil + default: + return nil, fmt.Errorf("unsupported count_hll iterator type: %T", input) + } +} diff --git a/influxql/query/compile.go b/influxql/query/compile.go index a712cbfafb..7dd120545d 100644 --- a/influxql/query/compile.go +++ b/influxql/query/compile.go @@ -308,6 +308,8 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error { return c.compileElapsed(expr.Args) case "integral": return c.compileIntegral(expr.Args) + case "count_hll": + return c.compileCountHll(expr.Args) case "holt_winters", "holt_winters_with_fit": withFit := expr.Name == "holt_winters_with_fit" return c.compileHoltWinters(expr.Args, withFit) @@ -393,7 +395,7 @@ func (c *compiledField) compileFunction(expr *influxql.Call) error { switch expr.Name { case "max", "min", "first", "last": // top/bottom are not included here since they are not typical functions. - case "count", "sum", "mean", "median", "mode", "stddev", "spread": + case "count", "sum", "mean", "median", "mode", "stddev", "spread", "sum_hll": // These functions are not considered selectors. c.global.OnlySelectors = false default: @@ -784,6 +786,19 @@ func (c *compiledField) compileIntegral(args []influxql.Expr) error { return c.compileSymbol("integral", args[0]) } +func (c *compiledField) compileCountHll(args []influxql.Expr) error { + if exp, got := 1, len(args); exp != got { + return fmt.Errorf("invalid number of arguments for count_hll, expected %d, got %d", exp, got) + } + c.global.OnlySelectors = false + switch arg0 := args[0].(type) { + case *influxql.Call: + return c.compileExpr(arg0) + default: + return c.compileSymbol("count_hll", arg0) + } +} + func (c *compiledField) compileHoltWinters(args []influxql.Expr, withFit bool) error { name := "holt_winters" if withFit { diff --git a/influxql/query/functions.gen.go b/influxql/query/functions.gen.go index 9c62a81605..07bad78d30 100644 --- a/influxql/query/functions.gen.go +++ b/influxql/query/functions.gen.go @@ -7,9 +7,13 @@ package query import ( + "bytes" + "encoding/binary" "math/rand" "sort" "time" + + "github.com/influxdata/influxdb/v2/pkg/estimator/hll" ) // FloatPointAggregator aggregates points to produce a single point. @@ -22,20 +26,6 @@ type FloatBulkPointAggregator interface { AggregateFloatBulk(points []FloatPoint) } -// AggregateFloatPoints feeds a slice of FloatPoint into an -// aggregator. If the aggregator is a FloatBulkPointAggregator, it will -// use the AggregateBulk method. -func AggregateFloatPoints(a FloatPointAggregator, points []FloatPoint) { - switch a := a.(type) { - case FloatBulkPointAggregator: - a.AggregateFloatBulk(points) - default: - for _, p := range points { - a.AggregateFloat(&p) - } - } -} - // FloatPointEmitter produces a single point from an aggregate. type FloatPointEmitter interface { Emit() []FloatPoint @@ -391,6 +381,33 @@ func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// FloatSumHllReducer returns the HLL sketch for a series, in string form +type FloatSumHllReducer struct { + plus *hll.Plus +} + +// func NewFloatSumHllReducer creates a new FloatSumHllReducer +func NewFloatSumHllReducer() *FloatSumHllReducer { + return &FloatSumHllReducer{plus: hll.NewDefaultPlus()} +} + +// AggregateFloat aggregates a point into the reducer. +func (r *FloatSumHllReducer) AggregateFloat(p *FloatPoint) { + + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, p.Value) + b := buf.Bytes() + + r.plus.Add(b) +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *FloatSumHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, nil), + } +} + // FloatDistinctReducer returns the distinct points in a series. type FloatDistinctReducer struct { m map[float64]FloatPoint @@ -506,20 +523,6 @@ type IntegerBulkPointAggregator interface { AggregateIntegerBulk(points []IntegerPoint) } -// AggregateIntegerPoints feeds a slice of IntegerPoint into an -// aggregator. If the aggregator is a IntegerBulkPointAggregator, it will -// use the AggregateBulk method. -func AggregateIntegerPoints(a IntegerPointAggregator, points []IntegerPoint) { - switch a := a.(type) { - case IntegerBulkPointAggregator: - a.AggregateIntegerBulk(points) - default: - for _, p := range points { - a.AggregateInteger(&p) - } - } -} - // IntegerPointEmitter produces a single point from an aggregate. type IntegerPointEmitter interface { Emit() []IntegerPoint @@ -875,6 +878,33 @@ func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// IntegerSumHllReducer returns the HLL sketch for a series, in string form +type IntegerSumHllReducer struct { + plus *hll.Plus +} + +// func NewIntegerSumHllReducer creates a new IntegerSumHllReducer +func NewIntegerSumHllReducer() *IntegerSumHllReducer { + return &IntegerSumHllReducer{plus: hll.NewDefaultPlus()} +} + +// AggregateInteger aggregates a point into the reducer. +func (r *IntegerSumHllReducer) AggregateInteger(p *IntegerPoint) { + + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, p.Value) + b := buf.Bytes() + + r.plus.Add(b) +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *IntegerSumHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, nil), + } +} + // IntegerDistinctReducer returns the distinct points in a series. type IntegerDistinctReducer struct { m map[int64]IntegerPoint @@ -990,20 +1020,6 @@ type UnsignedBulkPointAggregator interface { AggregateUnsignedBulk(points []UnsignedPoint) } -// AggregateUnsignedPoints feeds a slice of UnsignedPoint into an -// aggregator. If the aggregator is a UnsignedBulkPointAggregator, it will -// use the AggregateBulk method. -func AggregateUnsignedPoints(a UnsignedPointAggregator, points []UnsignedPoint) { - switch a := a.(type) { - case UnsignedBulkPointAggregator: - a.AggregateUnsignedBulk(points) - default: - for _, p := range points { - a.AggregateUnsigned(&p) - } - } -} - // UnsignedPointEmitter produces a single point from an aggregate. type UnsignedPointEmitter interface { Emit() []UnsignedPoint @@ -1359,6 +1375,33 @@ func (r *UnsignedSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// UnsignedSumHllReducer returns the HLL sketch for a series, in string form +type UnsignedSumHllReducer struct { + plus *hll.Plus +} + +// func NewUnsignedSumHllReducer creates a new UnsignedSumHllReducer +func NewUnsignedSumHllReducer() *UnsignedSumHllReducer { + return &UnsignedSumHllReducer{plus: hll.NewDefaultPlus()} +} + +// AggregateUnsigned aggregates a point into the reducer. +func (r *UnsignedSumHllReducer) AggregateUnsigned(p *UnsignedPoint) { + + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, p.Value) + b := buf.Bytes() + + r.plus.Add(b) +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *UnsignedSumHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, nil), + } +} + // UnsignedDistinctReducer returns the distinct points in a series. type UnsignedDistinctReducer struct { m map[uint64]UnsignedPoint @@ -1474,20 +1517,6 @@ type StringBulkPointAggregator interface { AggregateStringBulk(points []StringPoint) } -// AggregateStringPoints feeds a slice of StringPoint into an -// aggregator. If the aggregator is a StringBulkPointAggregator, it will -// use the AggregateBulk method. -func AggregateStringPoints(a StringPointAggregator, points []StringPoint) { - switch a := a.(type) { - case StringBulkPointAggregator: - a.AggregateStringBulk(points) - default: - for _, p := range points { - a.AggregateString(&p) - } - } -} - // StringPointEmitter produces a single point from an aggregate. type StringPointEmitter interface { Emit() []StringPoint @@ -1843,6 +1872,31 @@ func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// StringSumHllReducer returns the HLL sketch for a series, in string form +type StringSumHllReducer struct { + plus *hll.Plus +} + +// func NewStringSumHllReducer creates a new StringSumHllReducer +func NewStringSumHllReducer() *StringSumHllReducer { + return &StringSumHllReducer{plus: hll.NewDefaultPlus()} +} + +// AggregateString aggregates a point into the reducer. +func (r *StringSumHllReducer) AggregateString(p *StringPoint) { + + b := []byte(p.Value) + + r.plus.Add(b) +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *StringSumHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, nil), + } +} + // StringDistinctReducer returns the distinct points in a series. type StringDistinctReducer struct { m map[string]StringPoint @@ -1958,20 +2012,6 @@ type BooleanBulkPointAggregator interface { AggregateBooleanBulk(points []BooleanPoint) } -// AggregateBooleanPoints feeds a slice of BooleanPoint into an -// aggregator. If the aggregator is a BooleanBulkPointAggregator, it will -// use the AggregateBulk method. -func AggregateBooleanPoints(a BooleanPointAggregator, points []BooleanPoint) { - switch a := a.(type) { - case BooleanBulkPointAggregator: - a.AggregateBooleanBulk(points) - default: - for _, p := range points { - a.AggregateBoolean(&p) - } - } -} - // BooleanPointEmitter produces a single point from an aggregate. type BooleanPointEmitter interface { Emit() []BooleanPoint @@ -2327,6 +2367,33 @@ func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// BooleanSumHllReducer returns the HLL sketch for a series, in string form +type BooleanSumHllReducer struct { + plus *hll.Plus +} + +// func NewBooleanSumHllReducer creates a new BooleanSumHllReducer +func NewBooleanSumHllReducer() *BooleanSumHllReducer { + return &BooleanSumHllReducer{plus: hll.NewDefaultPlus()} +} + +// AggregateBoolean aggregates a point into the reducer. +func (r *BooleanSumHllReducer) AggregateBoolean(p *BooleanPoint) { + + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, p.Value) + b := buf.Bytes() + + r.plus.Add(b) +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *BooleanSumHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, nil), + } +} + // BooleanDistinctReducer returns the distinct points in a series. type BooleanDistinctReducer struct { m map[bool]BooleanPoint diff --git a/influxql/query/functions.gen.go.tmpl b/influxql/query/functions.gen.go.tmpl index bd0d15b0a6..4910bbda77 100644 --- a/influxql/query/functions.gen.go.tmpl +++ b/influxql/query/functions.gen.go.tmpl @@ -1,9 +1,13 @@ package query import ( +"encoding/binary" +"bytes" "sort" "time" "math/rand" + +"github.com/influxdata/influxdb/v2/pkg/estimator/hll" ) {{with $types := .}}{{range $k := $types}} @@ -18,20 +22,6 @@ type {{$k.Name}}BulkPointAggregator interface { Aggregate{{$k.Name}}Bulk(points []{{$k.Name}}Point) } -// Aggregate{{$k.Name}}Points feeds a slice of {{$k.Name}}Point into an -// aggregator. If the aggregator is a {{$k.Name}}BulkPointAggregator, it will -// use the AggregateBulk method. -func Aggregate{{$k.Name}}Points(a {{$k.Name}}PointAggregator, points []{{$k.Name}}Point) { - switch a := a.(type) { - case {{$k.Name}}BulkPointAggregator: - a.Aggregate{{$k.Name}}Bulk(points) - default: - for _, p := range points { - a.Aggregate{{$k.Name}}(&p) - } - } -} - // {{$k.Name}}PointEmitter produces a single point from an aggregate. type {{$k.Name}}PointEmitter interface { Emit() []{{$k.Name}}Point @@ -110,6 +100,35 @@ func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) } {{end}} +// {{$k.Name}}SumHllReducer returns the HLL sketch for a series, in string form +type {{$k.Name}}SumHllReducer struct { + plus *hll.Plus +} + +// func New{{$k.Name}}SumHllReducer creates a new {{$k.Name}}SumHllReducer +func New{{$k.Name}}SumHllReducer() *{{$k.Name}}SumHllReducer { + return &{{$k.Name}}SumHllReducer{plus:hll.NewDefaultPlus()} +} + +// Aggregate{{$k.Name}} aggregates a point into the reducer. +func (r *{{$k.Name}}SumHllReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) { + {{if eq $k.Type "string"}} + b := []byte(p.Value) + {{else}} + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, p.Value) + b := buf.Bytes() + {{end}} + r.plus.Add(b) +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *{{$k.Name}}SumHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, nil), + } +} + // {{$k.Name}}DistinctReducer returns the distinct points in a series. type {{$k.Name}}DistinctReducer struct { m map[{{$k.Type}}]{{$k.Name}}Point diff --git a/influxql/query/functions.go b/influxql/query/functions.go index cb7b47a215..1c3a6bfc37 100644 --- a/influxql/query/functions.go +++ b/influxql/query/functions.go @@ -2,16 +2,22 @@ package query import ( "container/heap" + "encoding/base64" + "fmt" "math" "sort" "time" "github.com/influxdata/influxdb/v2/influxql/query/internal/gota" "github.com/influxdata/influxdb/v2/influxql/query/neldermead" + "github.com/influxdata/influxdb/v2/pkg/estimator/hll" "github.com/influxdata/influxql" ) -// queryFieldMapper is a FieldMapper that wraps another FieldMapper and exposes +var hllPrefix = []byte("HLL_") +var hllErrorPrefix = []byte("HLLERROR ") + +// FieldMapper is a FieldMapper that wraps another FieldMapper and exposes // the functions implemented by the query engine. type queryFieldMapper struct { influxql.FieldMapper @@ -2150,3 +2156,116 @@ func (r *UnsignedBottomReducer) Emit() []UnsignedPoint { sort.Sort(sort.Reverse(&h)) return points } + +type StringMergeHllReducer struct { + plus *hll.Plus + err error +} + +func NewStringMergeHllReducer() *StringMergeHllReducer { + return &StringMergeHllReducer{plus: nil} +} + +func unmarshalPlus(s string) (*hll.Plus, error) { + if string(hllPrefix) != s[:len(hllPrefix)] { + if string(hllErrorPrefix) == s[:len(hllErrorPrefix)] { + // parse a special error out of the string. + return nil, fmt.Errorf("%v", s[len(hllErrorPrefix):]) + } + return nil, fmt.Errorf("bad prefix for hll.Plus") + } + data := []byte(s[len(hllPrefix):]) + if len(data) == 0 { + // explicitly treat as empty no-op + return nil, nil + } + b := make([]byte, base64.StdEncoding.DecodedLen(len(data))) + _, _ = base64.StdEncoding.Decode(b, data) + h := new(hll.Plus) + if err := h.UnmarshalBinary(b); err != nil { + return nil, err + } + return h, nil +} + +func (r *StringMergeHllReducer) AggregateString(p *StringPoint) { + // we cannot return an error because returning an error slows all aggregation + // functions by ~1%. So we hack around it by marshalling the error as a string. + if r.err != nil { + return + } + h, err := unmarshalPlus(p.Value) + if err != nil { + r.err = err + return + } + if r.plus == nil { + r.plus = h + return + } + err = r.plus.Merge(h) + if err != nil { + r.err = err + return + } +} + +func marshalPlus(p *hll.Plus, err error) StringPoint { + if err != nil { + return StringPoint{ + Time: ZeroTime, + Value: string(hllErrorPrefix) + err.Error(), + } + } + if p == nil { + return StringPoint{ + Time: ZeroTime, + Value: string(hllPrefix), + } + } + b, err := p.MarshalBinary() + if err != nil { + return StringPoint{ + Time: ZeroTime, + Value: string(hllErrorPrefix) + err.Error(), + } + } + hllValue := make([]byte, len(hllPrefix)+base64.StdEncoding.EncodedLen(len(b))) + copy(hllValue, hllPrefix) + base64.StdEncoding.Encode(hllValue[len(hllPrefix):], b) + return StringPoint{ + Time: ZeroTime, + Value: string(hllValue), + } +} + +func (r *StringMergeHllReducer) Emit() []StringPoint { + return []StringPoint{ + marshalPlus(r.plus, r.err), + } +} + +type CountHllReducer struct { + next UnsignedPoint +} + +func NewCountHllReducer() *CountHllReducer { + return &CountHllReducer{} +} + +func (r *CountHllReducer) AggregateString(p *StringPoint) { + r.next.Name = p.Name + r.next.Time = p.Time + h, err := unmarshalPlus(p.Value) + if err != nil { + r.next.Value = 0 + return + } + r.next.Value = h.Count() +} + +func (r *CountHllReducer) Emit() []UnsignedPoint { + return []UnsignedPoint{ + r.next, + } +} diff --git a/influxql/query/functions_test.go b/influxql/query/functions_test.go index 94e33d1952..71d86b1f32 100644 --- a/influxql/query/functions_test.go +++ b/influxql/query/functions_test.go @@ -1,7 +1,11 @@ package query_test import ( + "crypto/sha1" + "fmt" "math" + "math/rand" + "strconv" "testing" "time" @@ -9,6 +13,8 @@ import ( "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/pkg/deep" "github.com/influxdata/influxql" + tassert "github.com/stretchr/testify/assert" + trequire "github.com/stretchr/testify/require" ) func almostEqual(got, exp float64) bool { @@ -497,3 +503,79 @@ func TestSample_SampleSizeGreaterThanNumPoints(t *testing.T) { t.Fatalf("unexpected points: %s", spew.Sdump(points)) } } + +func TestHll_SumAndMergeHll(t *testing.T) { + assert := tassert.New(t) + require := trequire.New(t) + + // Make 3000 random strings + r := rand.New(rand.NewSource(42)) + input := make([]*query.StringPoint, 0, 3000) + for i := 0; i < 3000; i++ { + input = append(input, &query.StringPoint{Value: strconv.FormatUint(r.Uint64(), 10)}) + } + + // Insert overlapping sections of the same points array to different reducers + s1 := query.NewStringSumHllReducer() + for _, p := range input[:2000] { + s1.AggregateString(p) + } + point1 := s1.Emit() + s2 := query.NewStringSumHllReducer() + for _, p := range input[1000:] { + s2.AggregateString(p) + } + point2 := s2.Emit() + // Demonstration of the input: repeatably seeded pseudorandom + // stringified integers (so we are testing the counting of unique strings, + // not unique integers). + require.Equal("17190211103962133664", input[2999].Value) + + checkStringFingerprint := func(prefix string, length int, hash string, check string) { + assert.Equal(length, len(check)) + assert.Equal(prefix, check[:len(prefix)]) + h := sha1.New() + h.Write([]byte(check)) + assert.Equal(hash, fmt.Sprintf("%x", h.Sum(nil))) + } + + require.Equal(len(point1), 1) + require.Equal(len(point2), 1) + checkStringFingerprint("HLL_AhABAAAAAAAAB9BIDQAJAAAUUaKsA4K/AtARkuMBsJwEyp8O", + 6964, "c59fa799fe8e78ab5347de385bf2a7c5b8085882", point1[0].Value) + checkStringFingerprint("HLL_AhABAAAAAAAAB9Db0QAHAAAUaP6aAaSRAoK/Ap70B/xSysEE", + 6996, "5f1696dfb455baab7fdb56ffd2197d27b09d6dcf", point2[0].Value) + + m := query.NewStringMergeHllReducer() + m.AggregateString(&point1[0]) + m.AggregateString(&point2[0]) + merged := m.Emit() + require.Equal(1, len(merged)) + checkStringFingerprint("HLL_AhAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAA", + 87396, "e5320860aa322efe9af268e171df916d2186c75f", merged[0].Value) + + m.AggregateString(&query.StringPoint{ + Time: query.ZeroTime, + Value: "some random string", + }) + mergedError := m.Emit() + // mid-level errors are: + require.Equal(1, len(mergedError)) + assert.Equal("HLLERROR bad prefix for hll.Plus", mergedError[0].Value) + + c := query.NewCountHllReducer() + c.AggregateString(&merged[0]) + counted := c.Emit() + require.Equal(1, len(counted)) + // Counted 4000 points, 3000 distinct points, answer is 2994 ≈ 3000 + assert.Equal(uint64(2994), counted[0].Value) + + c.AggregateString(&query.StringPoint{ + Time: query.ZeroTime, + Value: "HLLERROR bad prefix for hll.Plus", + }) + counted = c.Emit() + require.Equal(1, len(counted)) + // When we hit marshal/unmarshal errors + assert.Equal(uint64(0), counted[0].Value) +} diff --git a/influxql/query/iterator.go b/influxql/query/iterator.go index 50cf1422c6..8928d214f9 100644 --- a/influxql/query/iterator.go +++ b/influxql/query/iterator.go @@ -144,6 +144,13 @@ func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) { Args: call.Args, } } + // When merging the sum_hll() function, use merge_hll() to sum the counted points. + if call.Name == "sum_hll" { + opt.Expr = &influxql.Call{ + Name: "merge_hll", + Args: call.Args, + } + } return NewCallIterator(itr, opt) } diff --git a/influxql/query/iterator_test.go b/influxql/query/iterator_test.go index 632de0be1b..23f2adf69d 100644 --- a/influxql/query/iterator_test.go +++ b/influxql/query/iterator_test.go @@ -1568,6 +1568,64 @@ func TestIterator_EncodeDecode(t *testing.T) { } } +// Test implementation of query.IntegerIterator +type IntegerConstIterator struct { + numPoints int + Closed bool + stats query.IteratorStats + point query.IntegerPoint +} + +func BenchmarkIterator_Aggregator(b *testing.B) { + input := &IntegerConstIterator{ + numPoints: b.N, + Closed: false, + stats: query.IteratorStats{}, + point: query.IntegerPoint{ + Name: "constPoint", + Value: 1, + }, + } + opt := query.IteratorOptions{ + Interval: query.Interval{ + Duration: 100 * time.Minute, + }, + Expr: &influxql.Call{ + Name: "count", + }, + } + + counter, err := query.NewCallIterator(input, opt) + if err != nil { + b.Fatalf("Bad counter: %v", err) + } + + b.ResetTimer() + point, err := counter.(query.IntegerIterator).Next() + if err != nil { + b.Fatalf("Unexpected error %v", err) + } + if point == nil { + b.Fatal("Expected point not to be nil") + } + if point.Value != int64(b.N) { + b.Fatalf("Expected %v != %v points", b.N, point.Value) + } +} + +func (itr *IntegerConstIterator) Stats() query.IteratorStats { return itr.stats } +func (itr *IntegerConstIterator) Close() error { itr.Closed = true; return nil } + +// Next returns the next value and shifts it off the beginning of the points slice. +func (itr *IntegerConstIterator) Next() (*query.IntegerPoint, error) { + if itr.numPoints == 0 || itr.Closed { + return nil, nil + } + itr.numPoints-- + itr.point.Time++ + return &itr.point, nil +} + // Test implementation of influxql.FloatIterator type FloatIterator struct { Context context.Context diff --git a/influxql/query/select.go b/influxql/query/select.go index 58ea0f37fb..1f0d203bb0 100644 --- a/influxql/query/select.go +++ b/influxql/query/select.go @@ -285,7 +285,7 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ opt.Interval = Interval{} return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeFitData, interval) - case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "exponential_moving_average", "double_exponential_moving_average", "triple_exponential_moving_average", "relative_strength_index", "triple_exponential_derivative", "kaufmans_efficiency_ratio", "kaufmans_adaptive_moving_average", "chande_momentum_oscillator", "elapsed": + case "count_hll", "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "exponential_moving_average", "double_exponential_moving_average", "triple_exponential_moving_average", "relative_strength_index", "triple_exponential_derivative", "kaufmans_efficiency_ratio", "kaufmans_adaptive_moving_average", "chande_momentum_oscillator", "elapsed": if !opt.Interval.IsZero() { if opt.Ascending { opt.StartTime -= int64(opt.Interval.Duration) @@ -301,6 +301,8 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ } switch expr.Name { + case "count_hll": + return NewCountHllIterator(input, opt) case "derivative", "non_negative_derivative": interval := opt.DerivativeInterval() isNonNegative := (expr.Name == "non_negative_derivative") @@ -541,7 +543,7 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ } } fallthrough - case "min", "max", "sum", "first", "last", "mean": + case "min", "max", "sum", "first", "last", "mean", "sum_hll", "merge_hll": return b.callIterator(ctx, expr, opt) case "median": opt.Ordered = true diff --git a/influxql/query/select_test.go b/influxql/query/select_test.go index 5bdb428df1..65749ba363 100644 --- a/influxql/query/select_test.go +++ b/influxql/query/select_test.go @@ -17,6 +17,27 @@ import ( // Second represents a helper for type converting durations. const Second = int64(time.Second) +func randomFloatSlice(seed int64, length int) []float64 { + r := rand.New(rand.NewSource(seed)) + out := make([]float64, 0, length) + for i := 0; i < 3000; i++ { + out = append(out, r.Float64()) + } + return out +} + +func floatIterator(fSlice []float64, name, tags string, startTime, step int64) *FloatIterator { + p := make([]query.FloatPoint, 0, len(fSlice)) + currentTime := startTime + for _, f := range fSlice { + p = append(p, query.FloatPoint{Name: name, Tags: ParseTags(tags), Time: currentTime, Value: f}) + currentTime += step + } + return &FloatIterator{ + Points: p, + } +} + func TestSelect(t *testing.T) { for _, tt := range []struct { name string @@ -55,6 +76,30 @@ func TestSelect(t *testing.T) { {Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(10)}}, }, }, + { + name: "count_hll", + q: `SELECT count_hll(sum_hll(value)) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Float, + itrs: []query.Iterator{ + floatIterator(randomFloatSlice(42, 2000), "cpu", "region=west,host=A", 0*Second, 1), + &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 31 * Second, Value: 100}, + }}, + floatIterator(randomFloatSlice(42, 3000)[1000:], "cpu", "region=south,host=A", 0*Second, 1), + &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 0 * Second, Value: 20}, + }}, + }, + rows: []query.Row{ + // Note that for the first aggregate there are 2000 points in each series, but only 3000 unique points, 2994 ≈ 3000 + {Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{uint64(2994)}}, + {Time: 10 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{uint64(1)}}, + {Time: 30 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{uint64(1)}}, + {Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{uint64(1)}}, + }, + }, { name: "Distinct_Float", q: `SELECT distinct(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, diff --git a/pkg/estimator/hll/hll.go b/pkg/estimator/hll/hll.go index 0c60adb593..002dae1c1c 100644 --- a/pkg/estimator/hll/hll.go +++ b/pkg/estimator/hll/hll.go @@ -151,9 +151,10 @@ func (h *Plus) Add(v []byte) { if uint32(len(h.tmpSet))*100 > h.m { h.mergeSparse() - if uint32(h.sparseList.Len()) > h.m { - h.toNormal() - } + } + if uint32(h.sparseList.Len()) > h.m { + h.mergeSparse() + h.toNormal() } } else { i := bextr(x, 64-h.p, h.p) // {x63,...,x64-p} @@ -241,6 +242,10 @@ func (h *Plus) MarshalBinary() (data []byte, err error) { return nil, nil } + if h.sparse { + h.mergeSparse() + } + // Marshal a version marker. data = append(data, version) @@ -251,7 +256,7 @@ func (h *Plus) MarshalBinary() (data []byte, err error) { // It's using the sparse representation. data = append(data, byte(1)) - // Add the tmp_set + // Add the tmp_set (should be empty) tsdata, err := h.tmpSet.MarshalBinary() if err != nil { return nil, err diff --git a/pkg/estimator/hll/hll_test.go b/pkg/estimator/hll/hll_test.go index a0b82e1b49..1d24f0b13e 100644 --- a/pkg/estimator/hll/hll_test.go +++ b/pkg/estimator/hll/hll_test.go @@ -1,7 +1,6 @@ package hll import ( - crand "crypto/rand" "encoding/binary" "fmt" "math" @@ -469,9 +468,11 @@ func TestPlus_Marshal_Unmarshal_Sparse(t *testing.T) { h.sparse = true h.tmpSet = map[uint32]struct{}{26: {}, 40: {}} + src := rand.New(rand.NewSource(6611)) + // Add a bunch of values to the sparse representation. for i := 0; i < 10; i++ { - h.sparseList.Append(uint32(rand.Int())) + h.sparseList.Append(uint32(src.Int())) } data, err := h.MarshalBinary() @@ -501,9 +502,11 @@ func TestPlus_Marshal_Unmarshal_Dense(t *testing.T) { h, _ := NewPlus(4) h.sparse = false + src := rand.New(rand.NewSource(1688)) + // Add a bunch of values to the dense representation. for i := 0; i < 10; i++ { - h.denseList = append(h.denseList, uint8(rand.Int())) + h.denseList = append(h.denseList, uint8(src.Int())) } data, err := h.MarshalBinary() @@ -539,9 +542,11 @@ func TestPlus_Marshal_Unmarshal_Count(t *testing.T) { count := make(map[string]struct{}, 1000000) h, _ := NewPlus(16) + src := rand.New(rand.NewSource(6828)) + buf := make([]byte, 8) for i := 0; i < 1000000; i++ { - if _, err := crand.Read(buf); err != nil { + if _, err := src.Read(buf); err != nil { panic(err) } @@ -577,7 +582,7 @@ func TestPlus_Marshal_Unmarshal_Count(t *testing.T) { // Add some more values. for i := 0; i < 1000000; i++ { - if _, err := crand.Read(buf); err != nil { + if _, err := src.Read(buf); err != nil { panic(err) } @@ -605,13 +610,13 @@ func NewTestPlus(p uint8) *Plus { } // Generate random data to add to the sketch. -func genData(n int) [][]byte { +func genData(n int, src *rand.Rand) [][]byte { out := make([][]byte, 0, n) buf := make([]byte, 8) for i := 0; i < n; i++ { // generate 8 random bytes - n, err := rand.Read(buf) + n, err := src.Read(buf) if err != nil { panic(err) } else if n != 8 { @@ -630,10 +635,11 @@ func genData(n int) [][]byte { var benchdata = map[int][][]byte{} func benchmarkPlusAdd(b *testing.B, h *Plus, n int) { + src := rand.New(rand.NewSource(9938)) blobs, ok := benchdata[n] if !ok { // Generate it. - benchdata[n] = genData(n) + benchdata[n] = genData(n, src) blobs = benchdata[n] } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 69cfb6cd15..cd9586fa2a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2437,7 +2437,7 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal if e.index.Type() == tsdb.TSI1IndexName { indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} seriesOpt := opt - if len(opt.Dimensions) == 0 && call.Name == "count" { + if len(opt.Dimensions) == 0 && (call.Name == "count" || call.Name == "sum_hll") { // no point ordering the series if we are just counting all of them seriesOpt.Ordered = false }