fixes multiple selectors overwriting each other. fixes #4360

pull/4391/head
Cory LaNou 2015-10-09 16:45:05 -05:00
parent 86b5ad2096
commit 6787525912
3 changed files with 27 additions and 15 deletions

View File

@ -53,6 +53,7 @@
- [#4365](https://github.com/influxdb/influxdb/issues/4365): Prevent panic in DecodeSameTypeBlock
- [#4280](https://github.com/influxdb/influxdb/issues/4280): Only drop points matching WHERE clause
- [#4410](https://github.com/influxdb/influxdb/pull/4410): Fix infinite recursion in statement string(). Thanks @kostya-sh
- [#4360](https://github.com/influxdb/influxdb/issues/4360): Aggregate Selectors overwrite values during post-processing
## v0.9.4 [2015-09-14]

View File

@ -2606,6 +2606,12 @@ func TestServer_Query_AggregateSelectors(t *testing.T) {
command: `SELECT time, tx, min(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"results":[{"series":[{"name":"network","columns":["time","tx","min"],"values":[["2000-01-01T00:00:00Z",20,10],["2000-01-01T00:00:30Z",60,40],["2000-01-01T00:01:20Z",4,5]]}]}]}`,
},
&Query{
name: "max,min - baseline 30s",
params: url.Values{"db": []string{"db0"}},
command: `SELECT max(rx), min(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"results":[{"series":[{"name":"network","columns":["time","max","min"],"values":[["2000-01-01T00:00:00Z",40,10],["2000-01-01T00:00:30Z",50,40],["2000-01-01T00:01:00Z",90,5]]}]}]}`,
},
&Query{
name: "first - baseline 30s",
params: url.Values{"db": []string{"db0"}},

View File

@ -625,23 +625,23 @@ func (e *SelectExecutor) processFunctions(results [][]interface{}, columnNames [
}
func (e *SelectExecutor) processSelectors(results [][]interface{}, callPosition int, hasTimeField bool, columnNames []string) ([][]interface{}, error) {
for i, vals := range results {
for j := 1; j < len(vals); j++ {
switch v := vals[j].(type) {
// if the columns doesn't have enough columns, expand it
for i, columns := range results {
if len(columns) != len(columnNames) {
columns = append(columns, make([]interface{}, len(columnNames)-len(columns))...)
}
for j := 1; j < len(columns); j++ {
switch v := columns[j].(type) {
case PositionPoint:
tMin := vals[0].(time.Time)
results[i] = e.selectorPointToQueryResult(vals, hasTimeField, callPosition, v, tMin, columnNames)
tMin := columns[0].(time.Time)
results[i] = e.selectorPointToQueryResult(columns, hasTimeField, callPosition, v, tMin, columnNames)
}
}
}
return results, nil
}
func (e *SelectExecutor) selectorPointToQueryResult(row []interface{}, hasTimeField bool, columnIndex int, p PositionPoint, tMin time.Time, columnNames []string) []interface{} {
// if the row doesn't have enough columns, expand it
if len(row) != len(columnNames) {
row = append(row, make([]interface{}, len(columnNames)-len(row))...)
}
func (e *SelectExecutor) selectorPointToQueryResult(columns []interface{}, hasTimeField bool, columnIndex int, p PositionPoint, tMin time.Time, columnNames []string) []interface{} {
callCount := len(e.stmt.FunctionCalls())
if callCount == 1 {
tm := time.Unix(0, p.Time).UTC().Format(time.RFC3339Nano)
@ -649,29 +649,34 @@ func (e *SelectExecutor) selectorPointToQueryResult(row []interface{}, hasTimeFi
if len(e.stmt.Dimensions) > 0 && !hasTimeField {
tm = tMin.UTC().Format(time.RFC3339Nano)
}
row[0] = tm
columns[0] = tm
}
for i, c := range columnNames {
// skip over time, we already handled that above
if i == 0 {
continue
}
if (i == columnIndex && hasTimeField) || (i == columnIndex+1 && !hasTimeField) {
row[i] = p.Value
// Check to see if we previously processed this column, if so, continue
if _, ok := columns[i].(PositionPoint); !ok && columns[i] != nil {
continue
}
columns[i] = p.Value
continue
}
if callCount == 1 {
// Always favor fields over tags if there is a name collision
if t, ok := p.Fields[c]; ok {
row[i] = t
columns[i] = t
} else if t, ok := p.Tags[c]; ok {
// look in the tags for a value
row[i] = t
columns[i] = t
}
}
}
return row
return columns
}
func (e *SelectExecutor) processAggregates(results [][]interface{}, columnNames []string, call *influxql.Call) ([][]interface{}, error) {