Return the correct auxiliary values for top/bottom

When `top()` or `bottom()` were used and selected auxiliary values, they
would return the wrong values that would be equal to the last point
selected. This is because the aggregators saved the memory address of
the auxiliary fields instead of copying them over. Since the same
auxiliary fields memory location is used for every value returned by the
storage engine, this resulted in the values being incorrect because they
were overwritten with incorrect values.

This fixes that so the auxiliary fields are copied out when they are
saved rather than only the memory location.
pull/9858/head
Jonathan A. Sternberg 2018-05-17 10:25:40 -05:00
parent 1a8931af42
commit 8a2bc63d3c
5 changed files with 283 additions and 40 deletions

View File

@ -1882,11 +1882,14 @@ func (r *FloatTopReducer) AggregateFloat(p *FloatPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)
var clone FloatPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}
func (r *FloatTopReducer) Emit() []FloatPoint {
@ -1925,11 +1928,14 @@ func (r *IntegerTopReducer) AggregateInteger(p *IntegerPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)
var clone IntegerPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}
func (r *IntegerTopReducer) Emit() []IntegerPoint {
@ -1968,11 +1974,14 @@ func (r *UnsignedTopReducer) AggregateUnsigned(p *UnsignedPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)
var clone UnsignedPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}
func (r *UnsignedTopReducer) Emit() []UnsignedPoint {
@ -2011,11 +2020,14 @@ func (r *FloatBottomReducer) AggregateFloat(p *FloatPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)
var clone FloatPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}
func (r *FloatBottomReducer) Emit() []FloatPoint {
@ -2054,11 +2066,14 @@ func (r *IntegerBottomReducer) AggregateInteger(p *IntegerPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)
var clone IntegerPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}
func (r *IntegerBottomReducer) Emit() []IntegerPoint {
@ -2097,11 +2112,14 @@ func (r *UnsignedBottomReducer) AggregateUnsigned(p *UnsignedPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)
var clone UnsignedPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}
func (r *UnsignedBottomReducer) Emit() []UnsignedPoint {

View File

@ -1520,6 +1520,7 @@ type FloatIterator struct {
Closed bool
Delay time.Duration
stats query.IteratorStats
point query.FloatPoint
}
func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats }
@ -1549,7 +1550,20 @@ func (itr *FloatIterator) Next() (*query.FloatPoint, error) {
}
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil
// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}
func FloatIterators(inputs []*FloatIterator) []query.Iterator {
@ -1565,6 +1579,7 @@ type IntegerIterator struct {
Points []query.IntegerPoint
Closed bool
stats query.IteratorStats
point query.IntegerPoint
}
func (itr *IntegerIterator) Stats() query.IteratorStats { return itr.stats }
@ -1578,7 +1593,20 @@ func (itr *IntegerIterator) Next() (*query.IntegerPoint, error) {
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil
// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}
func IntegerIterators(inputs []*IntegerIterator) []query.Iterator {
@ -1594,6 +1622,7 @@ type UnsignedIterator struct {
Points []query.UnsignedPoint
Closed bool
stats query.IteratorStats
point query.UnsignedPoint
}
func (itr *UnsignedIterator) Stats() query.IteratorStats { return itr.stats }
@ -1607,7 +1636,20 @@ func (itr *UnsignedIterator) Next() (*query.UnsignedPoint, error) {
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil
// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}
func UnsignedIterators(inputs []*UnsignedIterator) []query.Iterator {
@ -1623,6 +1665,7 @@ type StringIterator struct {
Points []query.StringPoint
Closed bool
stats query.IteratorStats
point query.StringPoint
}
func (itr *StringIterator) Stats() query.IteratorStats { return itr.stats }
@ -1636,7 +1679,20 @@ func (itr *StringIterator) Next() (*query.StringPoint, error) {
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil
// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}
func StringIterators(inputs []*StringIterator) []query.Iterator {
@ -1652,6 +1708,7 @@ type BooleanIterator struct {
Points []query.BooleanPoint
Closed bool
stats query.IteratorStats
point query.BooleanPoint
}
func (itr *BooleanIterator) Stats() query.IteratorStats { return itr.stats }
@ -1665,7 +1722,20 @@ func (itr *BooleanIterator) Next() (*query.BooleanPoint, error) {
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil
// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}
func BooleanIterators(inputs []*BooleanIterator) []query.Iterator {

View File

@ -61,9 +61,13 @@ func (v *FloatPoint) Clone() *FloatPoint {
// CopyTo makes a deep copy into the point.
func (v *FloatPoint) CopyTo(other *FloatPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
@ -282,9 +286,13 @@ func (v *IntegerPoint) Clone() *IntegerPoint {
// CopyTo makes a deep copy into the point.
func (v *IntegerPoint) CopyTo(other *IntegerPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
@ -503,9 +511,13 @@ func (v *UnsignedPoint) Clone() *UnsignedPoint {
// CopyTo makes a deep copy into the point.
func (v *UnsignedPoint) CopyTo(other *UnsignedPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
@ -722,9 +734,13 @@ func (v *StringPoint) Clone() *StringPoint {
// CopyTo makes a deep copy into the point.
func (v *StringPoint) CopyTo(other *StringPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
@ -943,9 +959,13 @@ func (v *BooleanPoint) Clone() *BooleanPoint {
// CopyTo makes a deep copy into the point.
func (v *BooleanPoint) CopyTo(other *BooleanPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}

View File

@ -57,9 +57,13 @@ func (v *{{.Name}}Point) Clone() *{{.Name}}Point {
// CopyTo makes a deep copy into the point.
func (v *{{.Name}}Point) CopyTo(other *{{.Name}}Point) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}

View File

@ -19,14 +19,15 @@ const Second = int64(time.Second)
func TestSelect(t *testing.T) {
for _, tt := range []struct {
name string
q string
typ influxql.DataType
expr string
itrs []query.Iterator
rows []query.Row
now time.Time
err string
name string
q string
typ influxql.DataType
fields map[string]influxql.DataType
expr string
itrs []query.Iterator
rows []query.Row
now time.Time
err string
}{
{
name: "Min",
@ -827,6 +828,69 @@ func TestSelect(t *testing.T) {
{Time: 31 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("region=west")}, Values: []interface{}{uint64(100), "A"}},
},
},
{
name: "Top_AuxFields_Float",
q: `SELECT top(p1, 2), p2, p3 FROM cpu`,
fields: map[string]influxql.DataType{
"p1": influxql.Float,
"p2": influxql.Float,
"p3": influxql.Float,
},
itrs: []query.Iterator{
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 1, Aux: []interface{}{float64(2), "aaa"}},
{Name: "cpu", Time: 1 * Second, Value: 2, Aux: []interface{}{float64(3), "bbb"}},
{Name: "cpu", Time: 2 * Second, Value: 3, Aux: []interface{}{float64(4), "ccc"}},
{Name: "cpu", Time: 3 * Second, Value: 4, Aux: []interface{}{float64(5), "ddd"}},
}},
},
rows: []query.Row{
{Time: 2 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(3), float64(4), "ccc"}},
{Time: 3 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(4), float64(5), "ddd"}},
},
},
{
name: "Top_AuxFields_Integer",
q: `SELECT top(p1, 2), p2, p3 FROM cpu`,
fields: map[string]influxql.DataType{
"p1": influxql.Integer,
"p2": influxql.Integer,
"p3": influxql.Integer,
},
itrs: []query.Iterator{
&IntegerIterator{Points: []query.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 1, Aux: []interface{}{int64(2), "aaa"}},
{Name: "cpu", Time: 1 * Second, Value: 2, Aux: []interface{}{int64(3), "bbb"}},
{Name: "cpu", Time: 2 * Second, Value: 3, Aux: []interface{}{int64(4), "ccc"}},
{Name: "cpu", Time: 3 * Second, Value: 4, Aux: []interface{}{int64(5), "ddd"}},
}},
},
rows: []query.Row{
{Time: 2 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(3), int64(4), "ccc"}},
{Time: 3 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(4), int64(5), "ddd"}},
},
},
{
name: "Top_AuxFields_Unsigned",
q: `SELECT top(p1, 2), p2, p3 FROM cpu`,
fields: map[string]influxql.DataType{
"p1": influxql.Unsigned,
"p2": influxql.Unsigned,
"p3": influxql.Unsigned,
},
itrs: []query.Iterator{
&UnsignedIterator{Points: []query.UnsignedPoint{
{Name: "cpu", Time: 0 * Second, Value: 1, Aux: []interface{}{uint64(2), "aaa"}},
{Name: "cpu", Time: 1 * Second, Value: 2, Aux: []interface{}{uint64(3), "bbb"}},
{Name: "cpu", Time: 2 * Second, Value: 3, Aux: []interface{}{uint64(4), "ccc"}},
{Name: "cpu", Time: 3 * Second, Value: 4, Aux: []interface{}{uint64(5), "ddd"}},
}},
},
rows: []query.Row{
{Time: 2 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{uint64(3), uint64(4), "ccc"}},
{Time: 3 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{uint64(4), uint64(5), "ddd"}},
},
},
{
name: "Bottom_NoTags_Float",
q: `SELECT bottom(value::float, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`,
@ -1104,6 +1168,69 @@ func TestSelect(t *testing.T) {
{Time: 50 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("region=west")}, Values: []interface{}{uint64(1), "B"}},
},
},
{
name: "Bottom_AuxFields_Float",
q: `SELECT bottom(p1, 2), p2, p3 FROM cpu`,
fields: map[string]influxql.DataType{
"p1": influxql.Float,
"p2": influxql.Float,
"p3": influxql.Float,
},
itrs: []query.Iterator{
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 1, Aux: []interface{}{float64(2), "aaa"}},
{Name: "cpu", Time: 1 * Second, Value: 2, Aux: []interface{}{float64(3), "bbb"}},
{Name: "cpu", Time: 2 * Second, Value: 3, Aux: []interface{}{float64(4), "ccc"}},
{Name: "cpu", Time: 3 * Second, Value: 4, Aux: []interface{}{float64(5), "ddd"}},
}},
},
rows: []query.Row{
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(1), float64(2), "aaa"}},
{Time: 1 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(2), float64(3), "bbb"}},
},
},
{
name: "Bottom_AuxFields_Integer",
q: `SELECT bottom(p1, 2), p2, p3 FROM cpu`,
fields: map[string]influxql.DataType{
"p1": influxql.Integer,
"p2": influxql.Integer,
"p3": influxql.Integer,
},
itrs: []query.Iterator{
&IntegerIterator{Points: []query.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 1, Aux: []interface{}{int64(2), "aaa"}},
{Name: "cpu", Time: 1 * Second, Value: 2, Aux: []interface{}{int64(3), "bbb"}},
{Name: "cpu", Time: 2 * Second, Value: 3, Aux: []interface{}{int64(4), "ccc"}},
{Name: "cpu", Time: 3 * Second, Value: 4, Aux: []interface{}{int64(5), "ddd"}},
}},
},
rows: []query.Row{
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(1), int64(2), "aaa"}},
{Time: 1 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(2), int64(3), "bbb"}},
},
},
{
name: "Bottom_AuxFields_Unsigned",
q: `SELECT bottom(p1, 2), p2, p3 FROM cpu`,
fields: map[string]influxql.DataType{
"p1": influxql.Unsigned,
"p2": influxql.Unsigned,
"p3": influxql.Unsigned,
},
itrs: []query.Iterator{
&UnsignedIterator{Points: []query.UnsignedPoint{
{Name: "cpu", Time: 0 * Second, Value: 1, Aux: []interface{}{uint64(2), "aaa"}},
{Name: "cpu", Time: 1 * Second, Value: 2, Aux: []interface{}{uint64(3), "bbb"}},
{Name: "cpu", Time: 2 * Second, Value: 3, Aux: []interface{}{uint64(4), "ccc"}},
{Name: "cpu", Time: 3 * Second, Value: 4, Aux: []interface{}{uint64(5), "ddd"}},
}},
},
rows: []query.Row{
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{uint64(1), uint64(2), "aaa"}},
{Time: 1 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{uint64(2), uint64(3), "bbb"}},
},
},
{
name: "Fill_Null_Float",
q: `SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(null)`,
@ -2683,10 +2810,14 @@ func TestSelect(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
shardMapper := ShardMapper{
MapShardsFn: func(sources influxql.Sources, _ influxql.TimeRange) query.ShardGroup {
var fields map[string]influxql.DataType
if tt.typ != influxql.Unknown {
fields = map[string]influxql.DataType{"value": tt.typ}
} else {
fields = tt.fields
}
return &ShardGroup{
Fields: map[string]influxql.DataType{
"value": tt.typ,
},
Fields: fields,
Dimensions: []string{"host", "region"},
CreateIteratorFn: func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
if m.Name != "cpu" {