Merge pull request #7415 from influxdata/md-sample
Add sample function to query languagepull/7388/head
commit
616d4d28d3
|
@ -4,7 +4,8 @@
|
|||
|
||||
### Features
|
||||
|
||||
- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language
|
||||
- [#7415](https://github.com/influxdata/influxdb/pull/7415): Add sample function to query language.
|
||||
- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language.
|
||||
- [#7120](https://github.com/influxdata/influxdb/issues/7120): Add additional statistics to query executor.
|
||||
- [#7135](https://github.com/influxdata/influxdb/pull/7135): Support enable HTTP service over unix domain socket. Thanks @oiooj
|
||||
- [#3634](https://github.com/influxdata/influxdb/issues/3634): Support mixed duration units.
|
||||
|
|
|
@ -1544,7 +1544,7 @@ func (s *SelectStatement) validSelectWithAggregate() error {
|
|||
onlySelectors := true
|
||||
for k := range calls {
|
||||
switch k {
|
||||
case "top", "bottom", "max", "min", "first", "last", "percentile":
|
||||
case "top", "bottom", "max", "min", "first", "last", "percentile", "sample":
|
||||
default:
|
||||
onlySelectors = false
|
||||
break
|
||||
|
@ -1615,6 +1615,30 @@ func (s *SelectStatement) validPercentileAggr(expr *Call) error {
|
|||
}
|
||||
}
|
||||
|
||||
// validPercentileAggr determines if PERCENTILE have valid arguments.
|
||||
func (s *SelectStatement) validSampleAggr(expr *Call) error {
|
||||
if err := s.validSelectWithAggregate(); err != nil {
|
||||
return err
|
||||
}
|
||||
if exp, got := 2, len(expr.Args); got != exp {
|
||||
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
|
||||
}
|
||||
|
||||
switch expr.Args[0].(type) {
|
||||
case *VarRef:
|
||||
// do nothing
|
||||
default:
|
||||
return fmt.Errorf("expected field argument in sample()")
|
||||
}
|
||||
|
||||
switch expr.Args[1].(type) {
|
||||
case *IntegerLiteral:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("expected integer argument in sample()")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
||||
for _, f := range s.Fields {
|
||||
for _, expr := range walkFunctionCalls(f.Expr) {
|
||||
|
@ -1709,6 +1733,10 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
|||
if err := s.validPercentileAggr(expr); err != nil {
|
||||
return err
|
||||
}
|
||||
case "sample":
|
||||
if err := s.validSampleAggr(expr); err != nil {
|
||||
return err
|
||||
}
|
||||
case "holt_winters", "holt_winters_with_fit":
|
||||
if exp, got := 3, len(expr.Args); got != exp {
|
||||
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
|
||||
|
|
|
@ -1246,3 +1246,40 @@ func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, inclu
|
|||
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
|
||||
}
|
||||
}
|
||||
|
||||
// NewSampleIterator returns an iterator
|
||||
func NewSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
|
||||
return newSampleIterator(input, opt, size)
|
||||
}
|
||||
|
||||
// newSampleIterator returns an iterator
|
||||
func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
case FloatIterator:
|
||||
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
||||
fn := NewFloatSampleReducer(size)
|
||||
return fn, fn
|
||||
}
|
||||
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
|
||||
case IntegerIterator:
|
||||
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
||||
fn := NewIntegerSampleReducer(size)
|
||||
return fn, fn
|
||||
}
|
||||
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
|
||||
case BooleanIterator:
|
||||
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
||||
fn := NewBooleanSampleReducer(size)
|
||||
return fn, fn
|
||||
}
|
||||
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
||||
fn := NewStringSampleReducer(size)
|
||||
return fn, fn
|
||||
}
|
||||
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -844,6 +844,33 @@ func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN in
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkSampleIterator_1k(b *testing.B) { benchmarkSampleIterator(b, 1000) }
|
||||
func BenchmarkSampleIterator_100k(b *testing.B) { benchmarkSampleIterator(b, 100000) }
|
||||
func BenchmarkSampleIterator_1M(b *testing.B) { benchmarkSampleIterator(b, 1000000) }
|
||||
|
||||
func benchmarkSampleIterator(b *testing.B, pointN int) {
|
||||
b.ReportAllocs()
|
||||
|
||||
// Create a lightweight point generator.
|
||||
p := influxql.FloatPoint{Name: "cpu"}
|
||||
input := FloatPointGenerator{
|
||||
N: pointN,
|
||||
Fn: func(i int) *influxql.FloatPoint {
|
||||
p.Value = float64(i)
|
||||
return &p
|
||||
},
|
||||
}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
// Execute call against input.
|
||||
itr, err := influxql.NewSampleIterator(&input, influxql.IteratorOptions{}, 100)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
influxql.DrainIterator(itr)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDistinctIterator_1K(b *testing.B) { benchmarkDistinctIterator(b, 1000) }
|
||||
func BenchmarkDistinctIterator_100K(b *testing.B) { benchmarkDistinctIterator(b, 100000) }
|
||||
func BenchmarkDistinctIterator_1M(b *testing.B) { benchmarkDistinctIterator(b, 1000000) }
|
||||
|
|
|
@ -6,7 +6,11 @@
|
|||
|
||||
package influxql
|
||||
|
||||
import "sort"
|
||||
import (
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
// FloatPointAggregator aggregates points to produce a single point.
|
||||
type FloatPointAggregator interface {
|
||||
|
@ -377,6 +381,51 @@ func (r *FloatElapsedReducer) Emit() []IntegerPoint {
|
|||
return nil
|
||||
}
|
||||
|
||||
// FloatSampleReduces implements a reservoir sampling to calculate a random subset of points
|
||||
type FloatSampleReducer struct {
|
||||
count int // how many points we've iterated over
|
||||
rng *rand.Rand // random number generator for each reducer
|
||||
|
||||
points floatPoints // the reservoir
|
||||
}
|
||||
|
||||
// NewFloatSampleReducer creates a new FloatSampleReducer
|
||||
func NewFloatSampleReducer(size int) *FloatSampleReducer {
|
||||
return &FloatSampleReducer{
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
|
||||
points: make(floatPoints, size),
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateFloat aggregates a point into the reducer.
|
||||
func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) {
|
||||
r.count++
|
||||
// Fill the reservoir with the first n points
|
||||
if r.count-1 < len(r.points) {
|
||||
r.points[r.count-1] = *p
|
||||
return
|
||||
}
|
||||
|
||||
// Generate a random integer between 1 and the count and
|
||||
// if that number is less than the length of the slice
|
||||
// replace the point at that index rnd with p.
|
||||
rnd := rand.Intn(r.count)
|
||||
if rnd < len(r.points) {
|
||||
r.points[rnd] = *p
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the reservoir sample as many points.
|
||||
func (r *FloatSampleReducer) Emit() []FloatPoint {
|
||||
min := len(r.points)
|
||||
if r.count < min {
|
||||
min = r.count
|
||||
}
|
||||
pts := r.points[:min]
|
||||
sort.Sort(pts)
|
||||
return pts
|
||||
}
|
||||
|
||||
// IntegerPointAggregator aggregates points to produce a single point.
|
||||
type IntegerPointAggregator interface {
|
||||
AggregateInteger(p *IntegerPoint)
|
||||
|
@ -746,6 +795,51 @@ func (r *IntegerElapsedReducer) Emit() []IntegerPoint {
|
|||
return nil
|
||||
}
|
||||
|
||||
// IntegerSampleReduces implements a reservoir sampling to calculate a random subset of points
|
||||
type IntegerSampleReducer struct {
|
||||
count int // how many points we've iterated over
|
||||
rng *rand.Rand // random number generator for each reducer
|
||||
|
||||
points integerPoints // the reservoir
|
||||
}
|
||||
|
||||
// NewIntegerSampleReducer creates a new IntegerSampleReducer
|
||||
func NewIntegerSampleReducer(size int) *IntegerSampleReducer {
|
||||
return &IntegerSampleReducer{
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
|
||||
points: make(integerPoints, size),
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateInteger aggregates a point into the reducer.
|
||||
func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) {
|
||||
r.count++
|
||||
// Fill the reservoir with the first n points
|
||||
if r.count-1 < len(r.points) {
|
||||
r.points[r.count-1] = *p
|
||||
return
|
||||
}
|
||||
|
||||
// Generate a random integer between 1 and the count and
|
||||
// if that number is less than the length of the slice
|
||||
// replace the point at that index rnd with p.
|
||||
rnd := rand.Intn(r.count)
|
||||
if rnd < len(r.points) {
|
||||
r.points[rnd] = *p
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the reservoir sample as many points.
|
||||
func (r *IntegerSampleReducer) Emit() []IntegerPoint {
|
||||
min := len(r.points)
|
||||
if r.count < min {
|
||||
min = r.count
|
||||
}
|
||||
pts := r.points[:min]
|
||||
sort.Sort(pts)
|
||||
return pts
|
||||
}
|
||||
|
||||
// StringPointAggregator aggregates points to produce a single point.
|
||||
type StringPointAggregator interface {
|
||||
AggregateString(p *StringPoint)
|
||||
|
@ -1115,6 +1209,51 @@ func (r *StringElapsedReducer) Emit() []IntegerPoint {
|
|||
return nil
|
||||
}
|
||||
|
||||
// StringSampleReduces implements a reservoir sampling to calculate a random subset of points
|
||||
type StringSampleReducer struct {
|
||||
count int // how many points we've iterated over
|
||||
rng *rand.Rand // random number generator for each reducer
|
||||
|
||||
points stringPoints // the reservoir
|
||||
}
|
||||
|
||||
// NewStringSampleReducer creates a new StringSampleReducer
|
||||
func NewStringSampleReducer(size int) *StringSampleReducer {
|
||||
return &StringSampleReducer{
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
|
||||
points: make(stringPoints, size),
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateString aggregates a point into the reducer.
|
||||
func (r *StringSampleReducer) AggregateString(p *StringPoint) {
|
||||
r.count++
|
||||
// Fill the reservoir with the first n points
|
||||
if r.count-1 < len(r.points) {
|
||||
r.points[r.count-1] = *p
|
||||
return
|
||||
}
|
||||
|
||||
// Generate a random integer between 1 and the count and
|
||||
// if that number is less than the length of the slice
|
||||
// replace the point at that index rnd with p.
|
||||
rnd := rand.Intn(r.count)
|
||||
if rnd < len(r.points) {
|
||||
r.points[rnd] = *p
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the reservoir sample as many points.
|
||||
func (r *StringSampleReducer) Emit() []StringPoint {
|
||||
min := len(r.points)
|
||||
if r.count < min {
|
||||
min = r.count
|
||||
}
|
||||
pts := r.points[:min]
|
||||
sort.Sort(pts)
|
||||
return pts
|
||||
}
|
||||
|
||||
// BooleanPointAggregator aggregates points to produce a single point.
|
||||
type BooleanPointAggregator interface {
|
||||
AggregateBoolean(p *BooleanPoint)
|
||||
|
@ -1483,3 +1622,48 @@ func (r *BooleanElapsedReducer) Emit() []IntegerPoint {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BooleanSampleReduces implements a reservoir sampling to calculate a random subset of points
|
||||
type BooleanSampleReducer struct {
|
||||
count int // how many points we've iterated over
|
||||
rng *rand.Rand // random number generator for each reducer
|
||||
|
||||
points booleanPoints // the reservoir
|
||||
}
|
||||
|
||||
// NewBooleanSampleReducer creates a new BooleanSampleReducer
|
||||
func NewBooleanSampleReducer(size int) *BooleanSampleReducer {
|
||||
return &BooleanSampleReducer{
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
|
||||
points: make(booleanPoints, size),
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateBoolean aggregates a point into the reducer.
|
||||
func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) {
|
||||
r.count++
|
||||
// Fill the reservoir with the first n points
|
||||
if r.count-1 < len(r.points) {
|
||||
r.points[r.count-1] = *p
|
||||
return
|
||||
}
|
||||
|
||||
// Generate a random integer between 1 and the count and
|
||||
// if that number is less than the length of the slice
|
||||
// replace the point at that index rnd with p.
|
||||
rnd := rand.Intn(r.count)
|
||||
if rnd < len(r.points) {
|
||||
r.points[rnd] = *p
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the reservoir sample as many points.
|
||||
func (r *BooleanSampleReducer) Emit() []BooleanPoint {
|
||||
min := len(r.points)
|
||||
if r.count < min {
|
||||
min = r.count
|
||||
}
|
||||
pts := r.points[:min]
|
||||
sort.Sort(pts)
|
||||
return pts
|
||||
}
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package influxql
|
||||
|
||||
import "sort"
|
||||
import (
|
||||
"sort"
|
||||
"time"
|
||||
"math/rand"
|
||||
)
|
||||
|
||||
{{with $types := .}}{{range $k := $types}}
|
||||
|
||||
|
@ -166,5 +170,50 @@ func (r *{{$k.Name}}ElapsedReducer) Emit() []IntegerPoint {
|
|||
return nil
|
||||
}
|
||||
|
||||
// {{$k.Name}}SampleReduces implements a reservoir sampling to calculate a random subset of points
|
||||
type {{$k.Name}}SampleReducer struct {
|
||||
count int // how many points we've iterated over
|
||||
rng *rand.Rand // random number generator for each reducer
|
||||
|
||||
points {{$k.name}}Points // the reservoir
|
||||
}
|
||||
|
||||
// New{{$k.Name}}SampleReducer creates a new {{$k.Name}}SampleReducer
|
||||
func New{{$k.Name}}SampleReducer(size int) *{{$k.Name}}SampleReducer {
|
||||
return &{{$k.Name}}SampleReducer{
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
|
||||
points: make({{$k.name}}Points, size),
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate{{$k.Name}} aggregates a point into the reducer.
|
||||
func (r *{{$k.Name}}SampleReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
|
||||
r.count++
|
||||
// Fill the reservoir with the first n points
|
||||
if r.count-1 < len(r.points) {
|
||||
r.points[r.count-1] = *p
|
||||
return
|
||||
}
|
||||
|
||||
// Generate a random integer between 1 and the count and
|
||||
// if that number is less than the length of the slice
|
||||
// replace the point at that index rnd with p.
|
||||
rnd := rand.Intn(r.count)
|
||||
if rnd < len(r.points) {
|
||||
r.points[rnd] = *p
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the reservoir sample as many points.
|
||||
func (r *{{$k.Name}}SampleReducer) Emit() []{{$k.Name}}Point {
|
||||
min := len(r.points)
|
||||
if r.count < min {
|
||||
min = r.count
|
||||
}
|
||||
pts := r.points[:min]
|
||||
sort.Sort(pts)
|
||||
return pts
|
||||
}
|
||||
|
||||
|
||||
{{end}}{{end}}
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/pkg/deep"
|
||||
)
|
||||
|
||||
func almostEqual(got, exp float64) bool {
|
||||
|
@ -384,3 +386,108 @@ func TestHoltWinters_MaxTime(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSample_AllSamplesSeen attempts to verify that it is possible
|
||||
// to get every subsample in a reasonable number of iterations.
|
||||
//
|
||||
// The idea here is that 6 iterations should be enough to hit every possible
|
||||
// sequence atleast once.
|
||||
func TestSample_AllSamplesSeen(t *testing.T) {
|
||||
|
||||
ps := []influxql.FloatPoint{
|
||||
{Time: 1, Value: 1},
|
||||
{Time: 2, Value: 2},
|
||||
{Time: 3, Value: 3},
|
||||
}
|
||||
|
||||
// List of all the possible subsamples
|
||||
samples := [][]influxql.FloatPoint{
|
||||
{
|
||||
{Time: 1, Value: 1},
|
||||
{Time: 2, Value: 2},
|
||||
},
|
||||
{
|
||||
{Time: 1, Value: 1},
|
||||
{Time: 3, Value: 3},
|
||||
},
|
||||
{
|
||||
{Time: 2, Value: 2},
|
||||
{Time: 3, Value: 3},
|
||||
},
|
||||
}
|
||||
|
||||
// 6 iterations should be more than sufficient to garentee that
|
||||
// we hit every possible subsample.
|
||||
for i := 0; i < 6; i++ {
|
||||
s := influxql.NewFloatSampleReducer(2)
|
||||
for _, p := range ps {
|
||||
s.AggregateFloat(&p)
|
||||
}
|
||||
|
||||
points := s.Emit()
|
||||
|
||||
// if samples is empty we've seen every sample, so we're done
|
||||
if len(samples) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for i, sample := range samples {
|
||||
// if we find a sample that it matches, remove it from
|
||||
// this list of possible samples
|
||||
if deep.Equal(sample, points) {
|
||||
samples = append(samples[:i], samples[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// If we missed a sample, report the error
|
||||
if exp, got := 0, len(samples); exp != got {
|
||||
t.Fatalf("expected to get every sample: got %d, exp %d", got, exp)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSample_SampleSizeLessThanNumPoints(t *testing.T) {
|
||||
s := influxql.NewFloatSampleReducer(2)
|
||||
|
||||
ps := []influxql.FloatPoint{
|
||||
{Time: 1, Value: 1},
|
||||
{Time: 2, Value: 2},
|
||||
{Time: 3, Value: 3},
|
||||
}
|
||||
|
||||
for _, p := range ps {
|
||||
s.AggregateFloat(&p)
|
||||
}
|
||||
|
||||
points := s.Emit()
|
||||
|
||||
if exp, got := 2, len(points); exp != got {
|
||||
t.Fatalf("unexpected number of points emitted: got %d exp %d", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSample_SampleSizeGreaterThanNumPoints(t *testing.T) {
|
||||
s := influxql.NewFloatSampleReducer(4)
|
||||
|
||||
ps := []influxql.FloatPoint{
|
||||
{Time: 1, Value: 1},
|
||||
{Time: 2, Value: 2},
|
||||
{Time: 3, Value: 3},
|
||||
}
|
||||
|
||||
for _, p := range ps {
|
||||
s.AggregateFloat(&p)
|
||||
}
|
||||
|
||||
points := s.Emit()
|
||||
|
||||
if exp, got := len(ps), len(points); exp != got {
|
||||
t.Fatalf("unexpected number of points emitted: got %d exp %d", got, exp)
|
||||
}
|
||||
|
||||
if !deep.Equal(ps, points) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(points))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,6 +170,18 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
},
|
||||
},
|
||||
|
||||
// sample
|
||||
{
|
||||
s: `SELECT sample(field1, 100) FROM myseries;`,
|
||||
stmt: &influxql.SelectStatement{
|
||||
IsRawQuery: false,
|
||||
Fields: []*influxql.Field{
|
||||
{Expr: &influxql.Call{Name: "sample", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.IntegerLiteral{Val: 100}}}},
|
||||
},
|
||||
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
|
||||
},
|
||||
},
|
||||
|
||||
// derivative
|
||||
{
|
||||
s: `SELECT derivative(field1, 1h) FROM myseries;`,
|
||||
|
|
|
@ -254,6 +254,14 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec
|
|||
return nil, err
|
||||
}
|
||||
return NewIntervalIterator(input, opt), nil
|
||||
case "sample":
|
||||
input, err := buildExprIterator(expr.Args[0], ic, opt, selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
size := expr.Args[1].(*IntegerLiteral)
|
||||
|
||||
return newSampleIterator(input, opt, int(size.Val))
|
||||
case "holt_winters", "holt_winters_with_fit":
|
||||
input, err := buildExprIterator(expr.Args[0], ic, opt, selector)
|
||||
if err != nil {
|
||||
|
|
|
@ -1515,6 +1515,118 @@ func TestSelect_Percentile_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure a SELECT sample() query can be executed.
|
||||
func TestSelect_Sample_Float(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20},
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: 10},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: 19},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: 2},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: 10}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 19}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: 2}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a SELECT sample() query can be executed.
|
||||
func TestSelect_Sample_Integer(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &IntegerIterator{Points: []influxql.IntegerPoint{
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20},
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: 10},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: 19},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: 2},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}},
|
||||
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: 10}},
|
||||
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 19}},
|
||||
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: 2}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a SELECT sample() query can be executed.
|
||||
func TestSelect_Sample_Boolean(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &BooleanIterator{Points: []influxql.BooleanPoint{
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: true},
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: false},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: false},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: true},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: true}},
|
||||
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: false}},
|
||||
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: false}},
|
||||
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: true}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a SELECT sample() query can be executed.
|
||||
func TestSelect_Sample_String(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &StringIterator{Points: []influxql.StringPoint{
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: "a"},
|
||||
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: "b"},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: "c"},
|
||||
{Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: "d"},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: "a"}},
|
||||
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: "b"}},
|
||||
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: "c"}},
|
||||
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: "d"}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a simple raw SELECT statement can be executed.
|
||||
func TestSelect_Raw(t *testing.T) {
|
||||
// Mock two iterators -- one for each value in the query.
|
||||
|
|
Loading…
Reference in New Issue