feat(influxql): Add hyper log log operators (#22322)

In addition to helping with normal queries, this can improve the 'SHOW CARDINALITY'
meta-queries.


Co-authored-by: Sam Arnold <sarnold@influxdata.com>
pull/22335/head
Daniel Moran 2021-08-30 15:46:46 -04:00 committed by GitHub
parent 6bb95ae6fe
commit 37088e8f53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 597 additions and 102 deletions

View File

@ -49,6 +49,8 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21972](https://github.com/influxdata/influxdb/pull/21972): Added support for notebooks and annotations.
1. [22135](https://github.com/influxdata/influxdb/pull/22135): Added route to return known resources.
1. [22311](https://github.com/influxdata/influxdb/pull/22311): Add `storage-no-validate-field-size` config to `influxd` to disable enforcement of max field size.
1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data.
1. [22322](https://github.com/influxdata/influxdb/pull/22322): Add support for `merge_hll`, `sum_hll`, and `count_hll` in InfluxQL.
### Bug Fixes
@ -75,7 +77,6 @@ Because of the version bump to `go`, the macOS build for this release requires a
1. [21910](https://github.com/influxdata/influxdb/pull/21910): Added `--ui-disabled` option to `influxd` to allow for running with the UI disabled.
1. [21958](https://github.com/influxdata/influxdb/pull/21958): Telemetry improvements: Do not record telemetry data for non-existant paths; replace invalid static asset paths with a slug.
1. [22023](https://github.com/influxdata/influxdb/pull/22023): Upgrade Flux to v0.124.0.
1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data.
### Bug Fixes

View File

@ -53,6 +53,10 @@ func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
return newLastIterator(input, opt)
case "mean":
return newMeanIterator(input, opt)
case "sum_hll":
return NewSumHllIterator(input, opt)
case "merge_hll":
return NewMergeHllIterator(input, opt)
default:
return nil, fmt.Errorf("unsupported function call: %s", name)
}
@ -1529,3 +1533,68 @@ func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval)
return nil, fmt.Errorf("unsupported integral iterator type: %T", input)
}
}
// NewSumHllIterator returns an iterator for operating on a distinct() call.
func NewSumHllIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, StringPointEmitter) {
fn := NewFloatSumHllReducer()
return fn, fn
}
return newFloatReduceStringIterator(input, opt, createFn), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, StringPointEmitter) {
fn := NewIntegerSumHllReducer()
return fn, fn
}
return newIntegerReduceStringIterator(input, opt, createFn), nil
case UnsignedIterator:
createFn := func() (UnsignedPointAggregator, StringPointEmitter) {
fn := NewUnsignedSumHllReducer()
return fn, fn
}
return newUnsignedReduceStringIterator(input, opt, createFn), nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSumHllReducer()
return fn, fn
}
return newStringReduceStringIterator(input, opt, createFn), nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, StringPointEmitter) {
fn := NewBooleanSumHllReducer()
return fn, fn
}
return newBooleanReduceStringIterator(input, opt, createFn), nil
default:
return nil, fmt.Errorf("unsupported sum_hll iterator type: %T", input)
}
}
// NewSumHllIterator returns an iterator for operating on a distinct() call.
func NewMergeHllIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringMergeHllReducer()
return fn, fn
}
return newStringReduceStringIterator(input, opt, createFn), nil
default:
return nil, fmt.Errorf("unsupported merge_hll iterator type: %T", input)
}
}
func NewCountHllIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case StringIterator:
createFn := func() (StringPointAggregator, UnsignedPointEmitter) {
fn := NewCountHllReducer()
return fn, fn
}
return newStringStreamUnsignedIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported count_hll iterator type: %T", input)
}
}

View File

@ -308,6 +308,8 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error {
return c.compileElapsed(expr.Args)
case "integral":
return c.compileIntegral(expr.Args)
case "count_hll":
return c.compileCountHll(expr.Args)
case "holt_winters", "holt_winters_with_fit":
withFit := expr.Name == "holt_winters_with_fit"
return c.compileHoltWinters(expr.Args, withFit)
@ -393,7 +395,7 @@ func (c *compiledField) compileFunction(expr *influxql.Call) error {
switch expr.Name {
case "max", "min", "first", "last":
// top/bottom are not included here since they are not typical functions.
case "count", "sum", "mean", "median", "mode", "stddev", "spread":
case "count", "sum", "mean", "median", "mode", "stddev", "spread", "sum_hll":
// These functions are not considered selectors.
c.global.OnlySelectors = false
default:
@ -784,6 +786,19 @@ func (c *compiledField) compileIntegral(args []influxql.Expr) error {
return c.compileSymbol("integral", args[0])
}
func (c *compiledField) compileCountHll(args []influxql.Expr) error {
if exp, got := 1, len(args); exp != got {
return fmt.Errorf("invalid number of arguments for count_hll, expected %d, got %d", exp, got)
}
c.global.OnlySelectors = false
switch arg0 := args[0].(type) {
case *influxql.Call:
return c.compileExpr(arg0)
default:
return c.compileSymbol("count_hll", arg0)
}
}
func (c *compiledField) compileHoltWinters(args []influxql.Expr, withFit bool) error {
name := "holt_winters"
if withFit {

View File

@ -7,9 +7,13 @@
package query
import (
"bytes"
"encoding/binary"
"math/rand"
"sort"
"time"
"github.com/influxdata/influxdb/v2/pkg/estimator/hll"
)
// FloatPointAggregator aggregates points to produce a single point.
@ -22,20 +26,6 @@ type FloatBulkPointAggregator interface {
AggregateFloatBulk(points []FloatPoint)
}
// AggregateFloatPoints feeds a slice of FloatPoint into an
// aggregator. If the aggregator is a FloatBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateFloatPoints(a FloatPointAggregator, points []FloatPoint) {
switch a := a.(type) {
case FloatBulkPointAggregator:
a.AggregateFloatBulk(points)
default:
for _, p := range points {
a.AggregateFloat(&p)
}
}
}
// FloatPointEmitter produces a single point from an aggregate.
type FloatPointEmitter interface {
Emit() []FloatPoint
@ -391,6 +381,33 @@ func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// FloatSumHllReducer returns the HLL sketch for a series, in string form
type FloatSumHllReducer struct {
plus *hll.Plus
}
// func NewFloatSumHllReducer creates a new FloatSumHllReducer
func NewFloatSumHllReducer() *FloatSumHllReducer {
return &FloatSumHllReducer{plus: hll.NewDefaultPlus()}
}
// AggregateFloat aggregates a point into the reducer.
func (r *FloatSumHllReducer) AggregateFloat(p *FloatPoint) {
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, p.Value)
b := buf.Bytes()
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *FloatSumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
// FloatDistinctReducer returns the distinct points in a series.
type FloatDistinctReducer struct {
m map[float64]FloatPoint
@ -506,20 +523,6 @@ type IntegerBulkPointAggregator interface {
AggregateIntegerBulk(points []IntegerPoint)
}
// AggregateIntegerPoints feeds a slice of IntegerPoint into an
// aggregator. If the aggregator is a IntegerBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateIntegerPoints(a IntegerPointAggregator, points []IntegerPoint) {
switch a := a.(type) {
case IntegerBulkPointAggregator:
a.AggregateIntegerBulk(points)
default:
for _, p := range points {
a.AggregateInteger(&p)
}
}
}
// IntegerPointEmitter produces a single point from an aggregate.
type IntegerPointEmitter interface {
Emit() []IntegerPoint
@ -875,6 +878,33 @@ func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// IntegerSumHllReducer returns the HLL sketch for a series, in string form
type IntegerSumHllReducer struct {
plus *hll.Plus
}
// func NewIntegerSumHllReducer creates a new IntegerSumHllReducer
func NewIntegerSumHllReducer() *IntegerSumHllReducer {
return &IntegerSumHllReducer{plus: hll.NewDefaultPlus()}
}
// AggregateInteger aggregates a point into the reducer.
func (r *IntegerSumHllReducer) AggregateInteger(p *IntegerPoint) {
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, p.Value)
b := buf.Bytes()
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *IntegerSumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
// IntegerDistinctReducer returns the distinct points in a series.
type IntegerDistinctReducer struct {
m map[int64]IntegerPoint
@ -990,20 +1020,6 @@ type UnsignedBulkPointAggregator interface {
AggregateUnsignedBulk(points []UnsignedPoint)
}
// AggregateUnsignedPoints feeds a slice of UnsignedPoint into an
// aggregator. If the aggregator is a UnsignedBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateUnsignedPoints(a UnsignedPointAggregator, points []UnsignedPoint) {
switch a := a.(type) {
case UnsignedBulkPointAggregator:
a.AggregateUnsignedBulk(points)
default:
for _, p := range points {
a.AggregateUnsigned(&p)
}
}
}
// UnsignedPointEmitter produces a single point from an aggregate.
type UnsignedPointEmitter interface {
Emit() []UnsignedPoint
@ -1359,6 +1375,33 @@ func (r *UnsignedSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// UnsignedSumHllReducer returns the HLL sketch for a series, in string form
type UnsignedSumHllReducer struct {
plus *hll.Plus
}
// func NewUnsignedSumHllReducer creates a new UnsignedSumHllReducer
func NewUnsignedSumHllReducer() *UnsignedSumHllReducer {
return &UnsignedSumHllReducer{plus: hll.NewDefaultPlus()}
}
// AggregateUnsigned aggregates a point into the reducer.
func (r *UnsignedSumHllReducer) AggregateUnsigned(p *UnsignedPoint) {
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, p.Value)
b := buf.Bytes()
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *UnsignedSumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
// UnsignedDistinctReducer returns the distinct points in a series.
type UnsignedDistinctReducer struct {
m map[uint64]UnsignedPoint
@ -1474,20 +1517,6 @@ type StringBulkPointAggregator interface {
AggregateStringBulk(points []StringPoint)
}
// AggregateStringPoints feeds a slice of StringPoint into an
// aggregator. If the aggregator is a StringBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateStringPoints(a StringPointAggregator, points []StringPoint) {
switch a := a.(type) {
case StringBulkPointAggregator:
a.AggregateStringBulk(points)
default:
for _, p := range points {
a.AggregateString(&p)
}
}
}
// StringPointEmitter produces a single point from an aggregate.
type StringPointEmitter interface {
Emit() []StringPoint
@ -1843,6 +1872,31 @@ func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// StringSumHllReducer returns the HLL sketch for a series, in string form
type StringSumHllReducer struct {
plus *hll.Plus
}
// func NewStringSumHllReducer creates a new StringSumHllReducer
func NewStringSumHllReducer() *StringSumHllReducer {
return &StringSumHllReducer{plus: hll.NewDefaultPlus()}
}
// AggregateString aggregates a point into the reducer.
func (r *StringSumHllReducer) AggregateString(p *StringPoint) {
b := []byte(p.Value)
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *StringSumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
// StringDistinctReducer returns the distinct points in a series.
type StringDistinctReducer struct {
m map[string]StringPoint
@ -1958,20 +2012,6 @@ type BooleanBulkPointAggregator interface {
AggregateBooleanBulk(points []BooleanPoint)
}
// AggregateBooleanPoints feeds a slice of BooleanPoint into an
// aggregator. If the aggregator is a BooleanBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateBooleanPoints(a BooleanPointAggregator, points []BooleanPoint) {
switch a := a.(type) {
case BooleanBulkPointAggregator:
a.AggregateBooleanBulk(points)
default:
for _, p := range points {
a.AggregateBoolean(&p)
}
}
}
// BooleanPointEmitter produces a single point from an aggregate.
type BooleanPointEmitter interface {
Emit() []BooleanPoint
@ -2327,6 +2367,33 @@ func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// BooleanSumHllReducer returns the HLL sketch for a series, in string form
type BooleanSumHllReducer struct {
plus *hll.Plus
}
// func NewBooleanSumHllReducer creates a new BooleanSumHllReducer
func NewBooleanSumHllReducer() *BooleanSumHllReducer {
return &BooleanSumHllReducer{plus: hll.NewDefaultPlus()}
}
// AggregateBoolean aggregates a point into the reducer.
func (r *BooleanSumHllReducer) AggregateBoolean(p *BooleanPoint) {
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, p.Value)
b := buf.Bytes()
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *BooleanSumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
// BooleanDistinctReducer returns the distinct points in a series.
type BooleanDistinctReducer struct {
m map[bool]BooleanPoint

View File

@ -1,9 +1,13 @@
package query
import (
"encoding/binary"
"bytes"
"sort"
"time"
"math/rand"
"github.com/influxdata/influxdb/v2/pkg/estimator/hll"
)
{{with $types := .}}{{range $k := $types}}
@ -18,20 +22,6 @@ type {{$k.Name}}BulkPointAggregator interface {
Aggregate{{$k.Name}}Bulk(points []{{$k.Name}}Point)
}
// Aggregate{{$k.Name}}Points feeds a slice of {{$k.Name}}Point into an
// aggregator. If the aggregator is a {{$k.Name}}BulkPointAggregator, it will
// use the AggregateBulk method.
func Aggregate{{$k.Name}}Points(a {{$k.Name}}PointAggregator, points []{{$k.Name}}Point) {
switch a := a.(type) {
case {{$k.Name}}BulkPointAggregator:
a.Aggregate{{$k.Name}}Bulk(points)
default:
for _, p := range points {
a.Aggregate{{$k.Name}}(&p)
}
}
}
// {{$k.Name}}PointEmitter produces a single point from an aggregate.
type {{$k.Name}}PointEmitter interface {
Emit() []{{$k.Name}}Point
@ -110,6 +100,35 @@ func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer)
}
{{end}}
// {{$k.Name}}SumHllReducer returns the HLL sketch for a series, in string form
type {{$k.Name}}SumHllReducer struct {
plus *hll.Plus
}
// func New{{$k.Name}}SumHllReducer creates a new {{$k.Name}}SumHllReducer
func New{{$k.Name}}SumHllReducer() *{{$k.Name}}SumHllReducer {
return &{{$k.Name}}SumHllReducer{plus:hll.NewDefaultPlus()}
}
// Aggregate{{$k.Name}} aggregates a point into the reducer.
func (r *{{$k.Name}}SumHllReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
{{if eq $k.Type "string"}}
b := []byte(p.Value)
{{else}}
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, p.Value)
b := buf.Bytes()
{{end}}
r.plus.Add(b)
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *{{$k.Name}}SumHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, nil),
}
}
// {{$k.Name}}DistinctReducer returns the distinct points in a series.
type {{$k.Name}}DistinctReducer struct {
m map[{{$k.Type}}]{{$k.Name}}Point

View File

@ -2,16 +2,22 @@ package query
import (
"container/heap"
"encoding/base64"
"fmt"
"math"
"sort"
"time"
"github.com/influxdata/influxdb/v2/influxql/query/internal/gota"
"github.com/influxdata/influxdb/v2/influxql/query/neldermead"
"github.com/influxdata/influxdb/v2/pkg/estimator/hll"
"github.com/influxdata/influxql"
)
// queryFieldMapper is a FieldMapper that wraps another FieldMapper and exposes
var hllPrefix = []byte("HLL_")
var hllErrorPrefix = []byte("HLLERROR ")
// FieldMapper is a FieldMapper that wraps another FieldMapper and exposes
// the functions implemented by the query engine.
type queryFieldMapper struct {
influxql.FieldMapper
@ -2150,3 +2156,116 @@ func (r *UnsignedBottomReducer) Emit() []UnsignedPoint {
sort.Sort(sort.Reverse(&h))
return points
}
type StringMergeHllReducer struct {
plus *hll.Plus
err error
}
func NewStringMergeHllReducer() *StringMergeHllReducer {
return &StringMergeHllReducer{plus: nil}
}
func unmarshalPlus(s string) (*hll.Plus, error) {
if string(hllPrefix) != s[:len(hllPrefix)] {
if string(hllErrorPrefix) == s[:len(hllErrorPrefix)] {
// parse a special error out of the string.
return nil, fmt.Errorf("%v", s[len(hllErrorPrefix):])
}
return nil, fmt.Errorf("bad prefix for hll.Plus")
}
data := []byte(s[len(hllPrefix):])
if len(data) == 0 {
// explicitly treat as empty no-op
return nil, nil
}
b := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
_, _ = base64.StdEncoding.Decode(b, data)
h := new(hll.Plus)
if err := h.UnmarshalBinary(b); err != nil {
return nil, err
}
return h, nil
}
func (r *StringMergeHllReducer) AggregateString(p *StringPoint) {
// we cannot return an error because returning an error slows all aggregation
// functions by ~1%. So we hack around it by marshalling the error as a string.
if r.err != nil {
return
}
h, err := unmarshalPlus(p.Value)
if err != nil {
r.err = err
return
}
if r.plus == nil {
r.plus = h
return
}
err = r.plus.Merge(h)
if err != nil {
r.err = err
return
}
}
func marshalPlus(p *hll.Plus, err error) StringPoint {
if err != nil {
return StringPoint{
Time: ZeroTime,
Value: string(hllErrorPrefix) + err.Error(),
}
}
if p == nil {
return StringPoint{
Time: ZeroTime,
Value: string(hllPrefix),
}
}
b, err := p.MarshalBinary()
if err != nil {
return StringPoint{
Time: ZeroTime,
Value: string(hllErrorPrefix) + err.Error(),
}
}
hllValue := make([]byte, len(hllPrefix)+base64.StdEncoding.EncodedLen(len(b)))
copy(hllValue, hllPrefix)
base64.StdEncoding.Encode(hllValue[len(hllPrefix):], b)
return StringPoint{
Time: ZeroTime,
Value: string(hllValue),
}
}
func (r *StringMergeHllReducer) Emit() []StringPoint {
return []StringPoint{
marshalPlus(r.plus, r.err),
}
}
type CountHllReducer struct {
next UnsignedPoint
}
func NewCountHllReducer() *CountHllReducer {
return &CountHllReducer{}
}
func (r *CountHllReducer) AggregateString(p *StringPoint) {
r.next.Name = p.Name
r.next.Time = p.Time
h, err := unmarshalPlus(p.Value)
if err != nil {
r.next.Value = 0
return
}
r.next.Value = h.Count()
}
func (r *CountHllReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{
r.next,
}
}

View File

@ -1,7 +1,11 @@
package query_test
import (
"crypto/sha1"
"fmt"
"math"
"math/rand"
"strconv"
"testing"
"time"
@ -9,6 +13,8 @@ import (
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/pkg/deep"
"github.com/influxdata/influxql"
tassert "github.com/stretchr/testify/assert"
trequire "github.com/stretchr/testify/require"
)
func almostEqual(got, exp float64) bool {
@ -497,3 +503,79 @@ func TestSample_SampleSizeGreaterThanNumPoints(t *testing.T) {
t.Fatalf("unexpected points: %s", spew.Sdump(points))
}
}
func TestHll_SumAndMergeHll(t *testing.T) {
assert := tassert.New(t)
require := trequire.New(t)
// Make 3000 random strings
r := rand.New(rand.NewSource(42))
input := make([]*query.StringPoint, 0, 3000)
for i := 0; i < 3000; i++ {
input = append(input, &query.StringPoint{Value: strconv.FormatUint(r.Uint64(), 10)})
}
// Insert overlapping sections of the same points array to different reducers
s1 := query.NewStringSumHllReducer()
for _, p := range input[:2000] {
s1.AggregateString(p)
}
point1 := s1.Emit()
s2 := query.NewStringSumHllReducer()
for _, p := range input[1000:] {
s2.AggregateString(p)
}
point2 := s2.Emit()
// Demonstration of the input: repeatably seeded pseudorandom
// stringified integers (so we are testing the counting of unique strings,
// not unique integers).
require.Equal("17190211103962133664", input[2999].Value)
checkStringFingerprint := func(prefix string, length int, hash string, check string) {
assert.Equal(length, len(check))
assert.Equal(prefix, check[:len(prefix)])
h := sha1.New()
h.Write([]byte(check))
assert.Equal(hash, fmt.Sprintf("%x", h.Sum(nil)))
}
require.Equal(len(point1), 1)
require.Equal(len(point2), 1)
checkStringFingerprint("HLL_AhABAAAAAAAAB9BIDQAJAAAUUaKsA4K/AtARkuMBsJwEyp8O",
6964, "c59fa799fe8e78ab5347de385bf2a7c5b8085882", point1[0].Value)
checkStringFingerprint("HLL_AhABAAAAAAAAB9Db0QAHAAAUaP6aAaSRAoK/Ap70B/xSysEE",
6996, "5f1696dfb455baab7fdb56ffd2197d27b09d6dcf", point2[0].Value)
m := query.NewStringMergeHllReducer()
m.AggregateString(&point1[0])
m.AggregateString(&point2[0])
merged := m.Emit()
require.Equal(1, len(merged))
checkStringFingerprint("HLL_AhAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAA",
87396, "e5320860aa322efe9af268e171df916d2186c75f", merged[0].Value)
m.AggregateString(&query.StringPoint{
Time: query.ZeroTime,
Value: "some random string",
})
mergedError := m.Emit()
// mid-level errors are:
require.Equal(1, len(mergedError))
assert.Equal("HLLERROR bad prefix for hll.Plus", mergedError[0].Value)
c := query.NewCountHllReducer()
c.AggregateString(&merged[0])
counted := c.Emit()
require.Equal(1, len(counted))
// Counted 4000 points, 3000 distinct points, answer is 2994 ≈ 3000
assert.Equal(uint64(2994), counted[0].Value)
c.AggregateString(&query.StringPoint{
Time: query.ZeroTime,
Value: "HLLERROR bad prefix for hll.Plus",
})
counted = c.Emit()
require.Equal(1, len(counted))
// When we hit marshal/unmarshal errors
assert.Equal(uint64(0), counted[0].Value)
}

View File

@ -144,6 +144,13 @@ func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
Args: call.Args,
}
}
// When merging the sum_hll() function, use merge_hll() to sum the counted points.
if call.Name == "sum_hll" {
opt.Expr = &influxql.Call{
Name: "merge_hll",
Args: call.Args,
}
}
return NewCallIterator(itr, opt)
}

View File

@ -1568,6 +1568,64 @@ func TestIterator_EncodeDecode(t *testing.T) {
}
}
// Test implementation of query.IntegerIterator
type IntegerConstIterator struct {
numPoints int
Closed bool
stats query.IteratorStats
point query.IntegerPoint
}
func BenchmarkIterator_Aggregator(b *testing.B) {
input := &IntegerConstIterator{
numPoints: b.N,
Closed: false,
stats: query.IteratorStats{},
point: query.IntegerPoint{
Name: "constPoint",
Value: 1,
},
}
opt := query.IteratorOptions{
Interval: query.Interval{
Duration: 100 * time.Minute,
},
Expr: &influxql.Call{
Name: "count",
},
}
counter, err := query.NewCallIterator(input, opt)
if err != nil {
b.Fatalf("Bad counter: %v", err)
}
b.ResetTimer()
point, err := counter.(query.IntegerIterator).Next()
if err != nil {
b.Fatalf("Unexpected error %v", err)
}
if point == nil {
b.Fatal("Expected point not to be nil")
}
if point.Value != int64(b.N) {
b.Fatalf("Expected %v != %v points", b.N, point.Value)
}
}
func (itr *IntegerConstIterator) Stats() query.IteratorStats { return itr.stats }
func (itr *IntegerConstIterator) Close() error { itr.Closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice.
func (itr *IntegerConstIterator) Next() (*query.IntegerPoint, error) {
if itr.numPoints == 0 || itr.Closed {
return nil, nil
}
itr.numPoints--
itr.point.Time++
return &itr.point, nil
}
// Test implementation of influxql.FloatIterator
type FloatIterator struct {
Context context.Context

View File

@ -285,7 +285,7 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ
opt.Interval = Interval{}
return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeFitData, interval)
case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "exponential_moving_average", "double_exponential_moving_average", "triple_exponential_moving_average", "relative_strength_index", "triple_exponential_derivative", "kaufmans_efficiency_ratio", "kaufmans_adaptive_moving_average", "chande_momentum_oscillator", "elapsed":
case "count_hll", "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "exponential_moving_average", "double_exponential_moving_average", "triple_exponential_moving_average", "relative_strength_index", "triple_exponential_derivative", "kaufmans_efficiency_ratio", "kaufmans_adaptive_moving_average", "chande_momentum_oscillator", "elapsed":
if !opt.Interval.IsZero() {
if opt.Ascending {
opt.StartTime -= int64(opt.Interval.Duration)
@ -301,6 +301,8 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ
}
switch expr.Name {
case "count_hll":
return NewCountHllIterator(input, opt)
case "derivative", "non_negative_derivative":
interval := opt.DerivativeInterval()
isNonNegative := (expr.Name == "non_negative_derivative")
@ -541,7 +543,7 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ
}
}
fallthrough
case "min", "max", "sum", "first", "last", "mean":
case "min", "max", "sum", "first", "last", "mean", "sum_hll", "merge_hll":
return b.callIterator(ctx, expr, opt)
case "median":
opt.Ordered = true

View File

@ -17,6 +17,27 @@ import (
// Second represents a helper for type converting durations.
const Second = int64(time.Second)
func randomFloatSlice(seed int64, length int) []float64 {
r := rand.New(rand.NewSource(seed))
out := make([]float64, 0, length)
for i := 0; i < 3000; i++ {
out = append(out, r.Float64())
}
return out
}
func floatIterator(fSlice []float64, name, tags string, startTime, step int64) *FloatIterator {
p := make([]query.FloatPoint, 0, len(fSlice))
currentTime := startTime
for _, f := range fSlice {
p = append(p, query.FloatPoint{Name: name, Tags: ParseTags(tags), Time: currentTime, Value: f})
currentTime += step
}
return &FloatIterator{
Points: p,
}
}
func TestSelect(t *testing.T) {
for _, tt := range []struct {
name string
@ -55,6 +76,30 @@ func TestSelect(t *testing.T) {
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(10)}},
},
},
{
name: "count_hll",
q: `SELECT count_hll(sum_hll(value)) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`,
typ: influxql.Float,
itrs: []query.Iterator{
floatIterator(randomFloatSlice(42, 2000), "cpu", "region=west,host=A", 0*Second, 1),
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 0 * Second, Value: 20},
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 11 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 31 * Second, Value: 100},
}},
floatIterator(randomFloatSlice(42, 3000)[1000:], "cpu", "region=south,host=A", 0*Second, 1),
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 0 * Second, Value: 20},
}},
},
rows: []query.Row{
// Note that for the first aggregate there are 2000 points in each series, but only 3000 unique points, 2994 ≈ 3000
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{uint64(2994)}},
{Time: 10 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{uint64(1)}},
{Time: 30 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{uint64(1)}},
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{uint64(1)}},
},
},
{
name: "Distinct_Float",
q: `SELECT distinct(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`,

View File

@ -151,9 +151,10 @@ func (h *Plus) Add(v []byte) {
if uint32(len(h.tmpSet))*100 > h.m {
h.mergeSparse()
if uint32(h.sparseList.Len()) > h.m {
h.toNormal()
}
}
if uint32(h.sparseList.Len()) > h.m {
h.mergeSparse()
h.toNormal()
}
} else {
i := bextr(x, 64-h.p, h.p) // {x63,...,x64-p}
@ -241,6 +242,10 @@ func (h *Plus) MarshalBinary() (data []byte, err error) {
return nil, nil
}
if h.sparse {
h.mergeSparse()
}
// Marshal a version marker.
data = append(data, version)
@ -251,7 +256,7 @@ func (h *Plus) MarshalBinary() (data []byte, err error) {
// It's using the sparse representation.
data = append(data, byte(1))
// Add the tmp_set
// Add the tmp_set (should be empty)
tsdata, err := h.tmpSet.MarshalBinary()
if err != nil {
return nil, err

View File

@ -1,7 +1,6 @@
package hll
import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"math"
@ -469,9 +468,11 @@ func TestPlus_Marshal_Unmarshal_Sparse(t *testing.T) {
h.sparse = true
h.tmpSet = map[uint32]struct{}{26: {}, 40: {}}
src := rand.New(rand.NewSource(6611))
// Add a bunch of values to the sparse representation.
for i := 0; i < 10; i++ {
h.sparseList.Append(uint32(rand.Int()))
h.sparseList.Append(uint32(src.Int()))
}
data, err := h.MarshalBinary()
@ -501,9 +502,11 @@ func TestPlus_Marshal_Unmarshal_Dense(t *testing.T) {
h, _ := NewPlus(4)
h.sparse = false
src := rand.New(rand.NewSource(1688))
// Add a bunch of values to the dense representation.
for i := 0; i < 10; i++ {
h.denseList = append(h.denseList, uint8(rand.Int()))
h.denseList = append(h.denseList, uint8(src.Int()))
}
data, err := h.MarshalBinary()
@ -539,9 +542,11 @@ func TestPlus_Marshal_Unmarshal_Count(t *testing.T) {
count := make(map[string]struct{}, 1000000)
h, _ := NewPlus(16)
src := rand.New(rand.NewSource(6828))
buf := make([]byte, 8)
for i := 0; i < 1000000; i++ {
if _, err := crand.Read(buf); err != nil {
if _, err := src.Read(buf); err != nil {
panic(err)
}
@ -577,7 +582,7 @@ func TestPlus_Marshal_Unmarshal_Count(t *testing.T) {
// Add some more values.
for i := 0; i < 1000000; i++ {
if _, err := crand.Read(buf); err != nil {
if _, err := src.Read(buf); err != nil {
panic(err)
}
@ -605,13 +610,13 @@ func NewTestPlus(p uint8) *Plus {
}
// Generate random data to add to the sketch.
func genData(n int) [][]byte {
func genData(n int, src *rand.Rand) [][]byte {
out := make([][]byte, 0, n)
buf := make([]byte, 8)
for i := 0; i < n; i++ {
// generate 8 random bytes
n, err := rand.Read(buf)
n, err := src.Read(buf)
if err != nil {
panic(err)
} else if n != 8 {
@ -630,10 +635,11 @@ func genData(n int) [][]byte {
var benchdata = map[int][][]byte{}
func benchmarkPlusAdd(b *testing.B, h *Plus, n int) {
src := rand.New(rand.NewSource(9938))
blobs, ok := benchdata[n]
if !ok {
// Generate it.
benchdata[n] = genData(n)
benchdata[n] = genData(n, src)
blobs = benchdata[n]
}

View File

@ -2437,7 +2437,7 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
if e.index.Type() == tsdb.TSI1IndexName {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
seriesOpt := opt
if len(opt.Dimensions) == 0 && call.Name == "count" {
if len(opt.Dimensions) == 0 && (call.Name == "count" || call.Name == "sum_hll") {
// no point ordering the series if we are just counting all of them
seriesOpt.Ordered = false
}