Handle nil values from the tsm1 cursor correctly

Send nil values from the tsm1 cursor at the end of the cursor. After the
cursor reached tsm1, the `nextAt()` call would always return the default
value rather than a nil value.

Descending also didn't work correctly because the seeking functionality
for tsm1 iterators would always act like they were ascending instead of
descending when choosing which value to select. This resulted in very
strange output from the emitter since it couldn't figure out if it was
ascending or descending.

Fixes #6206.
pull/6222/head
Jonathan A. Sternberg 2016-04-05 12:49:20 -04:00
parent c2ac8c85b5
commit 94ec92d669
4 changed files with 118 additions and 30 deletions

View File

@ -837,7 +837,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
// Create cursor from field.
cur := e.buildCursor(mm.Name, seriesKey, opt.Aux[i], opt)
if cur != nil {
aux[i] = newBufCursor(cur)
aux[i] = newBufCursor(cur, opt.Ascending)
continue
}
@ -861,7 +861,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
if cur == nil {
return nil, nil
}
conds[i] = newBufCursor(cur)
conds[i] = newBufCursor(cur, opt.Ascending)
}
}

View File

@ -38,11 +38,12 @@ type bufCursor struct {
value interface{}
filled bool
}
ascending bool
}
// newBufCursor returns a bufferred wrapper for cur.
func newBufCursor(cur cursor) *bufCursor {
return &bufCursor{cur: cur}
func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending}
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
@ -74,14 +75,17 @@ func (c *bufCursor) peek() (k int64, v interface{}) {
func (c *bufCursor) nextAt(seek int64) interface{} {
for {
k, v := c.next()
if k == tsdb.EOF || k == seek {
return v
} else if k < seek {
continue
if k != tsdb.EOF {
if k == seek {
return v
} else if c.ascending && k < seek {
continue
} else if !c.ascending && k > seek {
continue
}
c.unread(k, v)
}
c.unread(k, v)
// Return "nil" value for type.
switch c.cur.(type) {
case floatCursor:
@ -160,8 +164,10 @@ func (itr *floatIterator) Next() *influxql.FloatPoint {
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF && (seek == tsdb.EOF || k < seek) {
seek = k
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek
@ -526,8 +532,10 @@ func (itr *integerIterator) Next() *influxql.IntegerPoint {
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF && (seek == tsdb.EOF || k < seek) {
seek = k
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek
@ -892,8 +900,10 @@ func (itr *stringIterator) Next() *influxql.StringPoint {
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF && (seek == tsdb.EOF || k < seek) {
seek = k
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek
@ -1258,8 +1268,10 @@ func (itr *booleanIterator) Next() *influxql.BooleanPoint {
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF && (seek == tsdb.EOF || k < seek) {
seek = k
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek

View File

@ -31,11 +31,12 @@ type bufCursor struct {
value interface{}
filled bool
}
ascending bool
}
// newBufCursor returns a bufferred wrapper for cur.
func newBufCursor(cur cursor) *bufCursor {
return &bufCursor{cur: cur}
func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending}
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
@ -67,14 +68,17 @@ func (c *bufCursor) peek() (k int64, v interface{}) {
func (c *bufCursor) nextAt(seek int64) interface{} {
for {
k, v := c.next()
if k == tsdb.EOF || k == seek {
return v
} else if k < seek {
continue
if k != tsdb.EOF {
if k == seek {
return v
} else if c.ascending && k < seek {
continue
} else if !c.ascending && k > seek {
continue
}
c.unread(k, v)
}
c.unread(k, v)
// Return "nil" value for type.
switch c.cur.(type) {
case floatCursor:
@ -156,8 +160,10 @@ func (itr *{{.name}}Iterator) Next() *influxql.{{.Name}}Point {
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF && (seek == tsdb.EOF || k < seek) {
seek = k
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek

View File

@ -153,7 +153,7 @@ func TestShardWriteAddNewField(t *testing.T) {
}
// Ensure a shard can create iterators for its underlying data.
func TestShard_CreateIterator(t *testing.T) {
func TestShard_CreateIterator_Ascending(t *testing.T) {
sh := NewShard()
// Calling CreateIterator when the engine is not open will return
@ -216,10 +216,80 @@ cpu,host=serverB,region=uswest value=25 0
Tags: influxql.NewTags(map[string]string{"host": "serverB"}),
Time: time.Unix(0, 0).UnixNano(),
Value: 25,
Aux: []interface{}{float64(0)},
Aux: []interface{}{(*float64)(nil)},
}) {
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
}
}
// Ensure a shard can create iterators for its underlying data.
func TestShard_CreateIterator_Descending(t *testing.T) {
sh := NewShard()
// Calling CreateIterator when the engine is not open will return
// ErrEngineClosed.
_, got := sh.CreateIterator(influxql.IteratorOptions{})
if exp := tsdb.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
if err := sh.Open(); err != nil {
t.Fatal(err)
}
defer sh.Close()
sh.MustWritePointsString(`
cpu,host=serverA,region=uswest value=100 0
cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)
// Create iterator.
itr, err := sh.CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Aux: []string{"val2"},
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: false,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
}
defer itr.Close()
fitr := itr.(influxql.FloatIterator)
// Read values from iterator.
if p := fitr.Next(); !deep.Equal(p, &influxql.FloatPoint{
Name: "cpu",
Tags: influxql.NewTags(map[string]string{"host": "serverB"}),
Time: time.Unix(0, 0).UnixNano(),
Value: 25,
Aux: []interface{}{(*float64)(nil)},
}) {
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
}
if p := fitr.Next(); !deep.Equal(p, &influxql.FloatPoint{
Name: "cpu",
Tags: influxql.NewTags(map[string]string{"host": "serverA"}),
Time: time.Unix(10, 0).UnixNano(),
Value: 50,
Aux: []interface{}{float64(5)},
}) {
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
}
if p := fitr.Next(); !deep.Equal(p, &influxql.FloatPoint{
Name: "cpu",
Tags: influxql.NewTags(map[string]string{"host": "serverA"}),
Time: time.Unix(0, 0).UnixNano(),
Value: 100,
Aux: []interface{}{(*float64)(nil)},
}) {
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
}
}
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }