Ensure the input for certain functions in the query engine are ordered

The following functions require ordered input but were not guaranteed to
received ordered input:

* `distinct()`
* `sample()`
* `holt_winters()`
* `holt_winters_with_fit()`
* `derivative()`
* `non_negative_derivative()`
* `difference()`
* `moving_average()`
* `elapsed()`
* `cumulative_sum()`
* `top()`
* `bottom()`

These function calls have now been modified to request that their input
be ordered by the query engine. This will prevent the improper output
that could have been caused by multiple series being merged together or
multiple shards being merged together potentially incorrectly when no
time grouping was specified.

Two additional functions were already correct to begin with (so there
are no bugs with these two, but I'm including their names for
completeness).

* `median()`
* `percentile()`
pull/8214/head
Jonathan A. Sternberg 2017-03-27 16:34:53 -05:00
parent 24109468c3
commit 7e0ed1f5e5
3 changed files with 92 additions and 12 deletions

View File

@ -28,6 +28,7 @@
- [#8093](https://github.com/influxdata/influxdb/issues/8093): Fix the time range when an exact timestamp is selected.
- [#8174](https://github.com/influxdata/influxdb/issues/8174): Fix query parser when using addition and subtraction without spaces.
- [#8167](https://github.com/influxdata/influxdb/issues/8167): Fix a regression when math was used with selectors.
- [#8175](https://github.com/influxdata/influxdb/issues/8175): Ensure the input for certain functions in the query engine are ordered.
## v1.2.2 [2017-03-14]

View File

@ -708,25 +708,31 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
// TODO(jsternberg): Refactor this. This section needs to die in a fire.
switch expr.Name {
case "distinct":
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
input, err = NewDistinctIterator(input, b.opt)
input, err = NewDistinctIterator(input, opt)
if err != nil {
return nil, err
}
return NewIntervalIterator(input, b.opt), nil
return NewIntervalIterator(input, opt), nil
case "sample":
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
size := expr.Args[1].(*IntegerLiteral)
return newSampleIterator(input, b.opt, int(size.Val))
return newSampleIterator(input, opt, int(size.Val))
case "holt_winters", "holt_winters_with_fit":
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
@ -737,7 +743,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
interval := b.opt.Interval.Duration
// Redefine interval to be unbounded to capture all aggregate results
opt := b.opt
opt.StartTime = MinTime
opt.EndTime = MaxTime
opt.Interval = Interval{}
@ -752,6 +757,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
opt.EndTime += int64(opt.Interval.Duration)
}
}
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
@ -781,11 +787,13 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name))
case "cumulative_sum":
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
return newCumulativeSumIterator(input, b.opt)
return newCumulativeSumIterator(input, opt)
}
itr, err := func() (Iterator, error) {
@ -894,12 +902,14 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}
n := expr.Args[len(expr.Args)-1].(*IntegerLiteral)
return newTopIterator(input, b.opt, n, tags)
return newTopIterator(input, opt, n, tags)
case "bottom":
var tags []int
if len(expr.Args) < 2 {
@ -918,7 +928,9 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
}
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}

View File

@ -5577,6 +5577,73 @@ func TestServer_Query_AcrossShardsAndFields(t *testing.T) {
}
}
func TestServer_Query_OrderedAcrossShards(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`cpu value=7 %d`, mustParseTime(time.RFC3339Nano, "2010-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=14 %d`, mustParseTime(time.RFC3339Nano, "2010-01-08T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=28 %d`, mustParseTime(time.RFC3339Nano, "2010-01-15T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=56 %d`, mustParseTime(time.RFC3339Nano, "2010-01-22T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=112 %d`, mustParseTime(time.RFC3339Nano, "2010-01-29T00:00:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: "derivative",
params: url.Values{"db": []string{"db0"}},
command: `SELECT derivative(value, 24h) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-01-08T00:00:00Z",1],["2010-01-15T00:00:00Z",2],["2010-01-22T00:00:00Z",4],["2010-01-29T00:00:00Z",8]]}]}]}`,
},
&Query{
name: "non_negative_derivative",
params: url.Values{"db": []string{"db0"}},
command: `SELECT non_negative_derivative(value, 24h) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","non_negative_derivative"],"values":[["2010-01-08T00:00:00Z",1],["2010-01-15T00:00:00Z",2],["2010-01-22T00:00:00Z",4],["2010-01-29T00:00:00Z",8]]}]}]}`,
},
&Query{
name: "difference",
params: url.Values{"db": []string{"db0"}},
command: `SELECT difference(value) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-01-08T00:00:00Z",7],["2010-01-15T00:00:00Z",14],["2010-01-22T00:00:00Z",28],["2010-01-29T00:00:00Z",56]]}]}]}`,
},
&Query{
name: "cumulative_sum",
params: url.Values{"db": []string{"db0"}},
command: `SELECT cumulative_sum(value) FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-01-01T00:00:00Z",7],["2010-01-08T00:00:00Z",21],["2010-01-15T00:00:00Z",49],["2010-01-22T00:00:00Z",105],["2010-01-29T00:00:00Z",217]]}]}]}`,
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_Where_Fields(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())