Ensure the input for certain functions in the query engine are ordered

The following functions require ordered input but were not guaranteed to
received ordered input:

* `distinct()`
* `sample()`
* `holt_winters()`
* `holt_winters_with_fit()`
* `derivative()`
* `non_negative_derivative()`
* `difference()`
* `moving_average()`
* `elapsed()`
* `cumulative_sum()`
* `top()`
* `bottom()`

These function calls have now been modified to request that their input
be ordered by the query engine. This will prevent the improper output
that could have been caused by multiple series being merged together or
multiple shards being merged together potentially incorrectly when no
time grouping was specified.

Two additional functions were already correct to begin with (so there
are no bugs with these two, but I'm including their names for
completeness).

* `median()`
* `percentile()`
pull/8220/head
Jonathan A. Sternberg 2017-03-27 16:34:53 -05:00
parent 3c0d1c1bb5
commit b1caafe82f
3 changed files with 92 additions and 12 deletions

View File

@ -5,6 +5,7 @@
- [#8022](https://github.com/influxdata/influxdb/issues/8022): Segment violation in models.Tags.Get
- [#8155](https://github.com/influxdata/influxdb/pull/8155): Simplify admin user check.
- [#8167](https://github.com/influxdata/influxdb/issues/8167): Fix a regression when math was used with selectors.
- [#8175](https://github.com/influxdata/influxdb/issues/8175): Ensure the input for certain functions in the query engine are ordered.
## v1.2.2 [2017-03-14]

View File

@ -5402,6 +5402,73 @@ func TestServer_Query_AcrossShardsAndFields(t *testing.T) {
}
}
func TestServer_Query_OrderedAcrossShards(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`cpu value=7 %d`, mustParseTime(time.RFC3339Nano, "2010-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=14 %d`, mustParseTime(time.RFC3339Nano, "2010-01-08T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=28 %d`, mustParseTime(time.RFC3339Nano, "2010-01-15T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=56 %d`, mustParseTime(time.RFC3339Nano, "2010-01-22T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=112 %d`, mustParseTime(time.RFC3339Nano, "2010-01-29T00:00:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: "derivative",
params: url.Values{"db": []string{"db0"}},
command: `SELECT derivative(value, 24h) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-01-08T00:00:00Z",1],["2010-01-15T00:00:00Z",2],["2010-01-22T00:00:00Z",4],["2010-01-29T00:00:00Z",8]]}]}]}`,
},
&Query{
name: "non_negative_derivative",
params: url.Values{"db": []string{"db0"}},
command: `SELECT non_negative_derivative(value, 24h) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","non_negative_derivative"],"values":[["2010-01-08T00:00:00Z",1],["2010-01-15T00:00:00Z",2],["2010-01-22T00:00:00Z",4],["2010-01-29T00:00:00Z",8]]}]}]}`,
},
&Query{
name: "difference",
params: url.Values{"db": []string{"db0"}},
command: `SELECT difference(value) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-01-08T00:00:00Z",7],["2010-01-15T00:00:00Z",14],["2010-01-22T00:00:00Z",28],["2010-01-29T00:00:00Z",56]]}]}]}`,
},
&Query{
name: "cumulative_sum",
params: url.Values{"db": []string{"db0"}},
command: `SELECT cumulative_sum(value) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-01-01T00:00:00Z",7],["2010-01-08T00:00:00Z",21],["2010-01-15T00:00:00Z",49],["2010-01-22T00:00:00Z",105],["2010-01-29T00:00:00Z",217]]}]}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_Where_Fields(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())

View File

@ -707,25 +707,31 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
// TODO(jsternberg): Refactor this. This section needs to die in a fire.
switch expr.Name {
case "distinct":
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
input, err = NewDistinctIterator(input, b.opt)
input, err = NewDistinctIterator(input, opt)
if err != nil {
return nil, err
}
return NewIntervalIterator(input, b.opt), nil
return NewIntervalIterator(input, opt), nil
case "sample":
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
size := expr.Args[1].(*IntegerLiteral)
return newSampleIterator(input, b.opt, int(size.Val))
return newSampleIterator(input, opt, int(size.Val))
case "holt_winters", "holt_winters_with_fit":
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
@ -736,7 +742,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
interval := b.opt.Interval.Duration
// Redefine interval to be unbounded to capture all aggregate results
opt := b.opt
opt.StartTime = MinTime
opt.EndTime = MaxTime
opt.Interval = Interval{}
@ -751,6 +756,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
opt.EndTime += int64(opt.Interval.Duration)
}
}
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
@ -780,11 +786,13 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name))
case "cumulative_sum":
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
return newCumulativeSumIterator(input, b.opt)
return newCumulativeSumIterator(input, opt)
}
itr, err := func() (Iterator, error) {
@ -893,12 +901,14 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}
n := expr.Args[len(expr.Args)-1].(*IntegerLiteral)
return newTopIterator(input, b.opt, n, tags)
return newTopIterator(input, opt, n, tags)
case "bottom":
var tags []int
if len(expr.Args) < 2 {
@ -917,7 +927,9 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}