fix(fill): fill resets the previous value when a new series or window is encountered (#13459)
parent
db762e277b
commit
3372d3b878
|
@ -220,6 +220,12 @@ func (cur *scannerCursorBase) Columns() []influxql.VarRef {
|
|||
return cur.columns
|
||||
}
|
||||
|
||||
func (cur *scannerCursorBase) clear(m map[string]interface{}) {
|
||||
for k := range m {
|
||||
delete(m, k)
|
||||
}
|
||||
}
|
||||
|
||||
var _ Cursor = (*scannerCursor)(nil)
|
||||
|
||||
type scannerCursor struct {
|
||||
|
@ -235,6 +241,10 @@ func newScannerCursor(s IteratorScanner, fields []*influxql.Field, opt IteratorO
|
|||
|
||||
func (s *scannerCursor) scan(m map[string]interface{}) (int64, string, Tags) {
|
||||
ts, name, tags := s.scanner.Peek()
|
||||
// if a new series, clear the map of previous values
|
||||
if name != s.series.Name || tags.ID() != s.series.Tags.ID() {
|
||||
s.clear(m)
|
||||
}
|
||||
if ts == ZeroTime {
|
||||
return ts, name, tags
|
||||
}
|
||||
|
@ -304,7 +314,10 @@ func (cur *multiScannerCursor) scan(m map[string]interface{}) (ts int64, name st
|
|||
if ts == ZeroTime {
|
||||
return ts, name, tags
|
||||
}
|
||||
|
||||
// if a new series, clear the map of previous values
|
||||
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
|
||||
cur.clear(m)
|
||||
}
|
||||
for _, s := range cur.scanners {
|
||||
s.ScanAt(ts, name, tags, m)
|
||||
}
|
||||
|
|
|
@ -1288,6 +1288,34 @@ func TestSelect(t *testing.T) {
|
|||
{Time: 50 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(2)}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Fill_Previous_Float_Two_Series",
|
||||
q: `SELECT last(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(previous)`,
|
||||
typ: influxql.Float,
|
||||
expr: `last(value::float)`,
|
||||
itrs: []query.Iterator{
|
||||
&FloatIterator{Points: []query.FloatPoint{
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 20},
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Value: 30},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 1},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Value: 2},
|
||||
}},
|
||||
},
|
||||
rows: []query.Row{
|
||||
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{nil}},
|
||||
{Time: 10 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{nil}},
|
||||
{Time: 20 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{nil}},
|
||||
{Time: 30 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(20)}},
|
||||
{Time: 40 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(30)}},
|
||||
{Time: 50 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(30)}},
|
||||
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{nil}},
|
||||
{Time: 10 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{nil}},
|
||||
{Time: 20 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{nil}},
|
||||
{Time: 30 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(1)}},
|
||||
{Time: 40 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(2)}},
|
||||
{Time: 50 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(2)}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Fill_Linear_Float_One",
|
||||
q: `SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`,
|
||||
|
|
Loading…
Reference in New Issue