diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ea576545f..26ed42ad99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ ### Features -- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language +- [#7415](https://github.com/influxdata/influxdb/pull/7415): Add sample function to query language. +- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language. - [#7120](https://github.com/influxdata/influxdb/issues/7120): Add additional statistics to query executor. - [#7135](https://github.com/influxdata/influxdb/pull/7135): Support enable HTTP service over unix domain socket. Thanks @oiooj - [#3634](https://github.com/influxdata/influxdb/issues/3634): Support mixed duration units. diff --git a/influxql/ast.go b/influxql/ast.go index b58ffb6328..50a6b8823e 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1544,7 +1544,7 @@ func (s *SelectStatement) validSelectWithAggregate() error { onlySelectors := true for k := range calls { switch k { - case "top", "bottom", "max", "min", "first", "last", "percentile": + case "top", "bottom", "max", "min", "first", "last", "percentile", "sample": default: onlySelectors = false break @@ -1615,6 +1615,30 @@ func (s *SelectStatement) validPercentileAggr(expr *Call) error { } } +// validPercentileAggr determines if PERCENTILE have valid arguments. +func (s *SelectStatement) validSampleAggr(expr *Call) error { + if err := s.validSelectWithAggregate(); err != nil { + return err + } + if exp, got := 2, len(expr.Args); got != exp { + return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) + } + + switch expr.Args[0].(type) { + case *VarRef: + // do nothing + default: + return fmt.Errorf("expected field argument in sample()") + } + + switch expr.Args[1].(type) { + case *IntegerLiteral: + return nil + default: + return fmt.Errorf("expected integer argument in sample()") + } +} + func (s *SelectStatement) validateAggregates(tr targetRequirement) error { for _, f := range s.Fields { for _, expr := range walkFunctionCalls(f.Expr) { @@ -1709,6 +1733,10 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { if err := s.validPercentileAggr(expr); err != nil { return err } + case "sample": + if err := s.validSampleAggr(expr); err != nil { + return err + } case "holt_winters", "holt_winters_with_fit": if exp, got := 3, len(expr.Args); got != exp { return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 049e8d3e05..0c5b05ec38 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -1246,3 +1246,40 @@ func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, inclu return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input) } } + +// NewSampleIterator returns an iterator +func NewSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) { + return newSampleIterator(input, opt, size) +} + +// newSampleIterator returns an iterator +func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) { + switch input := input.(type) { + case FloatIterator: + createFn := func() (FloatPointAggregator, FloatPointEmitter) { + fn := NewFloatSampleReducer(size) + return fn, fn + } + return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil + case IntegerIterator: + createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { + fn := NewIntegerSampleReducer(size) + return fn, fn + } + return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil + case BooleanIterator: + createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { + fn := NewBooleanSampleReducer(size) + return fn, fn + } + return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil + case StringIterator: + createFn := func() (StringPointAggregator, StringPointEmitter) { + fn := NewStringSampleReducer(size) + return fn, fn + } + return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil + default: + return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input) + } +} diff --git a/influxql/call_iterator_test.go b/influxql/call_iterator_test.go index 78b5a4ead7..30b34e5c14 100644 --- a/influxql/call_iterator_test.go +++ b/influxql/call_iterator_test.go @@ -844,6 +844,33 @@ func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN in } } +func BenchmarkSampleIterator_1k(b *testing.B) { benchmarkSampleIterator(b, 1000) } +func BenchmarkSampleIterator_100k(b *testing.B) { benchmarkSampleIterator(b, 100000) } +func BenchmarkSampleIterator_1M(b *testing.B) { benchmarkSampleIterator(b, 1000000) } + +func benchmarkSampleIterator(b *testing.B, pointN int) { + b.ReportAllocs() + + // Create a lightweight point generator. + p := influxql.FloatPoint{Name: "cpu"} + input := FloatPointGenerator{ + N: pointN, + Fn: func(i int) *influxql.FloatPoint { + p.Value = float64(i) + return &p + }, + } + + for i := 0; i < b.N; i++ { + // Execute call against input. + itr, err := influxql.NewSampleIterator(&input, influxql.IteratorOptions{}, 100) + if err != nil { + b.Fatal(err) + } + influxql.DrainIterator(itr) + } +} + func BenchmarkDistinctIterator_1K(b *testing.B) { benchmarkDistinctIterator(b, 1000) } func BenchmarkDistinctIterator_100K(b *testing.B) { benchmarkDistinctIterator(b, 100000) } func BenchmarkDistinctIterator_1M(b *testing.B) { benchmarkDistinctIterator(b, 1000000) } diff --git a/influxql/functions.gen.go b/influxql/functions.gen.go index f285a36279..789dd485dd 100644 --- a/influxql/functions.gen.go +++ b/influxql/functions.gen.go @@ -6,7 +6,11 @@ package influxql -import "sort" +import ( + "math/rand" + "sort" + "time" +) // FloatPointAggregator aggregates points to produce a single point. type FloatPointAggregator interface { @@ -377,6 +381,51 @@ func (r *FloatElapsedReducer) Emit() []IntegerPoint { return nil } +// FloatSampleReduces implements a reservoir sampling to calculate a random subset of points +type FloatSampleReducer struct { + count int // how many points we've iterated over + rng *rand.Rand // random number generator for each reducer + + points floatPoints // the reservoir +} + +// NewFloatSampleReducer creates a new FloatSampleReducer +func NewFloatSampleReducer(size int) *FloatSampleReducer { + return &FloatSampleReducer{ + rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ + points: make(floatPoints, size), + } +} + +// AggregateFloat aggregates a point into the reducer. +func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) { + r.count++ + // Fill the reservoir with the first n points + if r.count-1 < len(r.points) { + r.points[r.count-1] = *p + return + } + + // Generate a random integer between 1 and the count and + // if that number is less than the length of the slice + // replace the point at that index rnd with p. + rnd := rand.Intn(r.count) + if rnd < len(r.points) { + r.points[rnd] = *p + } +} + +// Emit emits the reservoir sample as many points. +func (r *FloatSampleReducer) Emit() []FloatPoint { + min := len(r.points) + if r.count < min { + min = r.count + } + pts := r.points[:min] + sort.Sort(pts) + return pts +} + // IntegerPointAggregator aggregates points to produce a single point. type IntegerPointAggregator interface { AggregateInteger(p *IntegerPoint) @@ -746,6 +795,51 @@ func (r *IntegerElapsedReducer) Emit() []IntegerPoint { return nil } +// IntegerSampleReduces implements a reservoir sampling to calculate a random subset of points +type IntegerSampleReducer struct { + count int // how many points we've iterated over + rng *rand.Rand // random number generator for each reducer + + points integerPoints // the reservoir +} + +// NewIntegerSampleReducer creates a new IntegerSampleReducer +func NewIntegerSampleReducer(size int) *IntegerSampleReducer { + return &IntegerSampleReducer{ + rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ + points: make(integerPoints, size), + } +} + +// AggregateInteger aggregates a point into the reducer. +func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) { + r.count++ + // Fill the reservoir with the first n points + if r.count-1 < len(r.points) { + r.points[r.count-1] = *p + return + } + + // Generate a random integer between 1 and the count and + // if that number is less than the length of the slice + // replace the point at that index rnd with p. + rnd := rand.Intn(r.count) + if rnd < len(r.points) { + r.points[rnd] = *p + } +} + +// Emit emits the reservoir sample as many points. +func (r *IntegerSampleReducer) Emit() []IntegerPoint { + min := len(r.points) + if r.count < min { + min = r.count + } + pts := r.points[:min] + sort.Sort(pts) + return pts +} + // StringPointAggregator aggregates points to produce a single point. type StringPointAggregator interface { AggregateString(p *StringPoint) @@ -1115,6 +1209,51 @@ func (r *StringElapsedReducer) Emit() []IntegerPoint { return nil } +// StringSampleReduces implements a reservoir sampling to calculate a random subset of points +type StringSampleReducer struct { + count int // how many points we've iterated over + rng *rand.Rand // random number generator for each reducer + + points stringPoints // the reservoir +} + +// NewStringSampleReducer creates a new StringSampleReducer +func NewStringSampleReducer(size int) *StringSampleReducer { + return &StringSampleReducer{ + rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ + points: make(stringPoints, size), + } +} + +// AggregateString aggregates a point into the reducer. +func (r *StringSampleReducer) AggregateString(p *StringPoint) { + r.count++ + // Fill the reservoir with the first n points + if r.count-1 < len(r.points) { + r.points[r.count-1] = *p + return + } + + // Generate a random integer between 1 and the count and + // if that number is less than the length of the slice + // replace the point at that index rnd with p. + rnd := rand.Intn(r.count) + if rnd < len(r.points) { + r.points[rnd] = *p + } +} + +// Emit emits the reservoir sample as many points. +func (r *StringSampleReducer) Emit() []StringPoint { + min := len(r.points) + if r.count < min { + min = r.count + } + pts := r.points[:min] + sort.Sort(pts) + return pts +} + // BooleanPointAggregator aggregates points to produce a single point. type BooleanPointAggregator interface { AggregateBoolean(p *BooleanPoint) @@ -1483,3 +1622,48 @@ func (r *BooleanElapsedReducer) Emit() []IntegerPoint { } return nil } + +// BooleanSampleReduces implements a reservoir sampling to calculate a random subset of points +type BooleanSampleReducer struct { + count int // how many points we've iterated over + rng *rand.Rand // random number generator for each reducer + + points booleanPoints // the reservoir +} + +// NewBooleanSampleReducer creates a new BooleanSampleReducer +func NewBooleanSampleReducer(size int) *BooleanSampleReducer { + return &BooleanSampleReducer{ + rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ + points: make(booleanPoints, size), + } +} + +// AggregateBoolean aggregates a point into the reducer. +func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) { + r.count++ + // Fill the reservoir with the first n points + if r.count-1 < len(r.points) { + r.points[r.count-1] = *p + return + } + + // Generate a random integer between 1 and the count and + // if that number is less than the length of the slice + // replace the point at that index rnd with p. + rnd := rand.Intn(r.count) + if rnd < len(r.points) { + r.points[rnd] = *p + } +} + +// Emit emits the reservoir sample as many points. +func (r *BooleanSampleReducer) Emit() []BooleanPoint { + min := len(r.points) + if r.count < min { + min = r.count + } + pts := r.points[:min] + sort.Sort(pts) + return pts +} diff --git a/influxql/functions.gen.go.tmpl b/influxql/functions.gen.go.tmpl index f2199a05db..b66742d7a0 100644 --- a/influxql/functions.gen.go.tmpl +++ b/influxql/functions.gen.go.tmpl @@ -1,6 +1,10 @@ package influxql -import "sort" +import ( +"sort" +"time" +"math/rand" +) {{with $types := .}}{{range $k := $types}} @@ -166,5 +170,50 @@ func (r *{{$k.Name}}ElapsedReducer) Emit() []IntegerPoint { return nil } +// {{$k.Name}}SampleReduces implements a reservoir sampling to calculate a random subset of points +type {{$k.Name}}SampleReducer struct { + count int // how many points we've iterated over + rng *rand.Rand // random number generator for each reducer + + points {{$k.name}}Points // the reservoir +} + +// New{{$k.Name}}SampleReducer creates a new {{$k.Name}}SampleReducer +func New{{$k.Name}}SampleReducer(size int) *{{$k.Name}}SampleReducer { + return &{{$k.Name}}SampleReducer{ + rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ + points: make({{$k.name}}Points, size), + } +} + +// Aggregate{{$k.Name}} aggregates a point into the reducer. +func (r *{{$k.Name}}SampleReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) { + r.count++ + // Fill the reservoir with the first n points + if r.count-1 < len(r.points) { + r.points[r.count-1] = *p + return + } + + // Generate a random integer between 1 and the count and + // if that number is less than the length of the slice + // replace the point at that index rnd with p. + rnd := rand.Intn(r.count) + if rnd < len(r.points) { + r.points[rnd] = *p + } +} + +// Emit emits the reservoir sample as many points. +func (r *{{$k.Name}}SampleReducer) Emit() []{{$k.Name}}Point { + min := len(r.points) + if r.count < min { + min = r.count + } + pts := r.points[:min] + sort.Sort(pts) + return pts +} + {{end}}{{end}} diff --git a/influxql/functions_test.go b/influxql/functions_test.go index cc2b91f006..8e556d1b96 100644 --- a/influxql/functions_test.go +++ b/influxql/functions_test.go @@ -5,7 +5,9 @@ import ( "testing" "time" + "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/pkg/deep" ) func almostEqual(got, exp float64) bool { @@ -384,3 +386,108 @@ func TestHoltWinters_MaxTime(t *testing.T) { } } } + +// TestSample_AllSamplesSeen attempts to verify that it is possible +// to get every subsample in a reasonable number of iterations. +// +// The idea here is that 6 iterations should be enough to hit every possible +// sequence atleast once. +func TestSample_AllSamplesSeen(t *testing.T) { + + ps := []influxql.FloatPoint{ + {Time: 1, Value: 1}, + {Time: 2, Value: 2}, + {Time: 3, Value: 3}, + } + + // List of all the possible subsamples + samples := [][]influxql.FloatPoint{ + { + {Time: 1, Value: 1}, + {Time: 2, Value: 2}, + }, + { + {Time: 1, Value: 1}, + {Time: 3, Value: 3}, + }, + { + {Time: 2, Value: 2}, + {Time: 3, Value: 3}, + }, + } + + // 6 iterations should be more than sufficient to garentee that + // we hit every possible subsample. + for i := 0; i < 6; i++ { + s := influxql.NewFloatSampleReducer(2) + for _, p := range ps { + s.AggregateFloat(&p) + } + + points := s.Emit() + + // if samples is empty we've seen every sample, so we're done + if len(samples) == 0 { + return + } + + for i, sample := range samples { + // if we find a sample that it matches, remove it from + // this list of possible samples + if deep.Equal(sample, points) { + samples = append(samples[:i], samples[i+1:]...) + } + } + + } + + // If we missed a sample, report the error + if exp, got := 0, len(samples); exp != got { + t.Fatalf("expected to get every sample: got %d, exp %d", got, exp) + } + +} + +func TestSample_SampleSizeLessThanNumPoints(t *testing.T) { + s := influxql.NewFloatSampleReducer(2) + + ps := []influxql.FloatPoint{ + {Time: 1, Value: 1}, + {Time: 2, Value: 2}, + {Time: 3, Value: 3}, + } + + for _, p := range ps { + s.AggregateFloat(&p) + } + + points := s.Emit() + + if exp, got := 2, len(points); exp != got { + t.Fatalf("unexpected number of points emitted: got %d exp %d", got, exp) + } +} + +func TestSample_SampleSizeGreaterThanNumPoints(t *testing.T) { + s := influxql.NewFloatSampleReducer(4) + + ps := []influxql.FloatPoint{ + {Time: 1, Value: 1}, + {Time: 2, Value: 2}, + {Time: 3, Value: 3}, + } + + for _, p := range ps { + s.AggregateFloat(&p) + } + + points := s.Emit() + + if exp, got := len(ps), len(points); exp != got { + t.Fatalf("unexpected number of points emitted: got %d exp %d", got, exp) + } + + if !deep.Equal(ps, points) { + t.Fatalf("unexpected points: %s", spew.Sdump(points)) + } +} diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 0cbaaa78c5..252f6f5a8d 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -170,6 +170,18 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // sample + { + s: `SELECT sample(field1, 100) FROM myseries;`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "sample", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.IntegerLiteral{Val: 100}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + }, + }, + // derivative { s: `SELECT derivative(field1, 1h) FROM myseries;`, diff --git a/influxql/select.go b/influxql/select.go index 48825a4401..219a677275 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -254,6 +254,14 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec return nil, err } return NewIntervalIterator(input, opt), nil + case "sample": + input, err := buildExprIterator(expr.Args[0], ic, opt, selector) + if err != nil { + return nil, err + } + size := expr.Args[1].(*IntegerLiteral) + + return newSampleIterator(input, opt, int(size.Val)) case "holt_winters", "holt_winters_with_fit": input, err := buildExprIterator(expr.Args[0], ic, opt, selector) if err != nil { diff --git a/influxql/select_test.go b/influxql/select_test.go index 7e09a17074..529ddf9cfc 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -1515,6 +1515,118 @@ func TestSelect_Percentile_Integer(t *testing.T) { } } +// Ensure a SELECT sample() query can be executed. +func TestSelect_Sample_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: 2}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: 10}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 19}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: 2}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure a SELECT sample() query can be executed. +func TestSelect_Sample_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: 2}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: 10}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 19}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: 2}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure a SELECT sample() query can be executed. +func TestSelect_Sample_Boolean(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &BooleanIterator{Points: []influxql.BooleanPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: true}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: true}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: true}}, + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: false}}, + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: false}}, + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure a SELECT sample() query can be executed. +func TestSelect_Sample_String(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &StringIterator{Points: []influxql.StringPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: "a"}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: "b"}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: "c"}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: "d"}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: "a"}}, + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: "b"}}, + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: "c"}}, + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: "d"}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a simple raw SELECT statement can be executed. func TestSelect_Raw(t *testing.T) { // Mock two iterators -- one for each value in the query.