From abfe5a54a0b419d0231762574229caddf971bbfd Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 3 Apr 2020 13:55:50 -0600 Subject: [PATCH 1/2] fix(storage): Fix query cursor read that caused cursor truncation. Filter cursors buffer points in between calls to Next() if the number of read points exceeds 1000. Previously, this buffer was being cleared out before being iterated over which caused queries to return a resultset which had a number of rows divisable by 1000. This change moves the clearing of the buffer until after the points have been read. This change affects any queries which read more than 1000 points from a single series & have a filter that can be successfully applied to at least one of those points. --- storage/reads/array_cursor.gen.go | 40 +++++++++++++++++++------- storage/reads/array_cursor.gen.go.tmpl | 8 ++++-- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index 9ef41c3950..8444d92252 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -55,8 +55,6 @@ func (c *floatArrayFilterCursor) Next() *cursors.FloatArray { if c.tmp.Len() > 0 { a = c.tmp - c.tmp.Timestamps = nil - c.tmp.Values = nil } else { a = c.FloatArrayCursor.Next() } @@ -76,6 +74,12 @@ LOOP: } } } + + // Clear bufferred timestamps & values if we make it through a cursor. + // The break above will skip this if a cursor is partially read. + c.tmp.Timestamps = nil + c.tmp.Values = nil + a = c.FloatArrayCursor.Next() } @@ -271,8 +275,6 @@ func (c *integerArrayFilterCursor) Next() *cursors.IntegerArray { if c.tmp.Len() > 0 { a = c.tmp - c.tmp.Timestamps = nil - c.tmp.Values = nil } else { a = c.IntegerArrayCursor.Next() } @@ -292,6 +294,12 @@ LOOP: } } } + + // Clear bufferred timestamps & values if we make it through a cursor. + // The break above will skip this if a cursor is partially read. + c.tmp.Timestamps = nil + c.tmp.Values = nil + a = c.IntegerArrayCursor.Next() } @@ -487,8 +495,6 @@ func (c *unsignedArrayFilterCursor) Next() *cursors.UnsignedArray { if c.tmp.Len() > 0 { a = c.tmp - c.tmp.Timestamps = nil - c.tmp.Values = nil } else { a = c.UnsignedArrayCursor.Next() } @@ -508,6 +514,12 @@ LOOP: } } } + + // Clear bufferred timestamps & values if we make it through a cursor. + // The break above will skip this if a cursor is partially read. + c.tmp.Timestamps = nil + c.tmp.Values = nil + a = c.UnsignedArrayCursor.Next() } @@ -703,8 +715,6 @@ func (c *stringArrayFilterCursor) Next() *cursors.StringArray { if c.tmp.Len() > 0 { a = c.tmp - c.tmp.Timestamps = nil - c.tmp.Values = nil } else { a = c.StringArrayCursor.Next() } @@ -724,6 +734,12 @@ LOOP: } } } + + // Clear bufferred timestamps & values if we make it through a cursor. + // The break above will skip this if a cursor is partially read. + c.tmp.Timestamps = nil + c.tmp.Values = nil + a = c.StringArrayCursor.Next() } @@ -879,8 +895,6 @@ func (c *booleanArrayFilterCursor) Next() *cursors.BooleanArray { if c.tmp.Len() > 0 { a = c.tmp - c.tmp.Timestamps = nil - c.tmp.Values = nil } else { a = c.BooleanArrayCursor.Next() } @@ -900,6 +914,12 @@ LOOP: } } } + + // Clear bufferred timestamps & values if we make it through a cursor. + // The break above will skip this if a cursor is partially read. + c.tmp.Timestamps = nil + c.tmp.Values = nil + a = c.BooleanArrayCursor.Next() } diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index 931b86380f..c8b9597e07 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -54,8 +54,6 @@ func (c *{{$type}}) Next() {{$arrayType}} { if c.tmp.Len() > 0 { a = c.tmp - c.tmp.Timestamps = nil - c.tmp.Values = nil } else { a = c.{{.Name}}ArrayCursor.Next() } @@ -75,6 +73,12 @@ LOOP: } } } + + // Clear bufferred timestamps & values if we make it through a cursor. + // The break above will skip this if a cursor is partially read. + c.tmp.Timestamps = nil + c.tmp.Values = nil + a = c.{{.Name}}ArrayCursor.Next() } From e639f99d0354f8c087717a86da72522690bd41c7 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 6 Apr 2020 14:17:22 -0600 Subject: [PATCH 2/2] fix(storage): Add filter regression test --- storage/reads/array_cursor_test.go | 58 ++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 storage/reads/array_cursor_test.go diff --git a/storage/reads/array_cursor_test.go b/storage/reads/array_cursor_test.go new file mode 100644 index 0000000000..71bf3bbac6 --- /dev/null +++ b/storage/reads/array_cursor_test.go @@ -0,0 +1,58 @@ +package reads + +import ( + "testing" + + "github.com/influxdata/influxdb/v2/tsdb/cursors" +) + +func TestIntegerFilterArrayCursor(t *testing.T) { + var i int + expr := MockExpression{ + EvalBoolFunc: func(v Valuer) bool { + i++ + return i%2 == 0 + }, + } + + var resultN int + ac := MockIntegerArrayCursor{ + CloseFunc: func() {}, + ErrFunc: func() error { return nil }, + StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} }, + NextFunc: func() *cursors.IntegerArray { + resultN++ + if resultN == 4 { + return cursors.NewIntegerArrayLen(0) + } + return cursors.NewIntegerArrayLen(900) + }, + } + + c := newIntegerFilterArrayCursor(&expr) + c.reset(&ac) + + if got, want := len(c.Next().Timestamps), 1000; got != want { + t.Fatalf("len(Next())=%d, want %d", got, want) + } else if got, want := len(c.Next().Timestamps), 350; got != want { + t.Fatalf("len(Next())=%d, want %d", got, want) + } +} + +type MockIntegerArrayCursor struct { + CloseFunc func() + ErrFunc func() error + StatsFunc func() cursors.CursorStats + NextFunc func() *cursors.IntegerArray +} + +func (c *MockIntegerArrayCursor) Close() { c.CloseFunc() } +func (c *MockIntegerArrayCursor) Err() error { return c.ErrFunc() } +func (c *MockIntegerArrayCursor) Stats() cursors.CursorStats { return c.StatsFunc() } +func (c *MockIntegerArrayCursor) Next() *cursors.IntegerArray { return c.NextFunc() } + +type MockExpression struct { + EvalBoolFunc func(v Valuer) bool +} + +func (e *MockExpression) EvalBool(v Valuer) bool { return e.EvalBoolFunc(v) }