Merge pull request #16600 from influxdata/er-tsm-block-fix

fix(storage): ensure all block data returned
pull/16605/head
Edd Robinson 2020-01-21 11:28:26 +00:00 committed by GitHub
commit aa8b12f2aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 241 additions and 12 deletions

View File

@ -105,8 +105,10 @@ func (c *floatArrayAscendingCursor) Next() *tsdb.FloatArray {
if pos < len(c.res.Timestamps) {
if c.tsm.pos < len(tvals.Timestamps) {
if pos == 0 {
// optimization: all points served from TSM data
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
// optimization: all points can be served from TSM data because
// we need the entire block and the block completely fits within
// the buffer.
copy(c.res.Timestamps, tvals.Timestamps)
pos += copy(c.res.Values, tvals.Values)
c.nextTSM()
@ -413,8 +415,10 @@ func (c *integerArrayAscendingCursor) Next() *tsdb.IntegerArray {
if pos < len(c.res.Timestamps) {
if c.tsm.pos < len(tvals.Timestamps) {
if pos == 0 {
// optimization: all points served from TSM data
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
// optimization: all points can be served from TSM data because
// we need the entire block and the block completely fits within
// the buffer.
copy(c.res.Timestamps, tvals.Timestamps)
pos += copy(c.res.Values, tvals.Values)
c.nextTSM()
@ -721,8 +725,10 @@ func (c *unsignedArrayAscendingCursor) Next() *tsdb.UnsignedArray {
if pos < len(c.res.Timestamps) {
if c.tsm.pos < len(tvals.Timestamps) {
if pos == 0 {
// optimization: all points served from TSM data
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
// optimization: all points can be served from TSM data because
// we need the entire block and the block completely fits within
// the buffer.
copy(c.res.Timestamps, tvals.Timestamps)
pos += copy(c.res.Values, tvals.Values)
c.nextTSM()
@ -1029,8 +1035,10 @@ func (c *stringArrayAscendingCursor) Next() *tsdb.StringArray {
if pos < len(c.res.Timestamps) {
if c.tsm.pos < len(tvals.Timestamps) {
if pos == 0 {
// optimization: all points served from TSM data
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
// optimization: all points can be served from TSM data because
// we need the entire block and the block completely fits within
// the buffer.
copy(c.res.Timestamps, tvals.Timestamps)
pos += copy(c.res.Values, tvals.Values)
c.nextTSM()
@ -1341,8 +1349,10 @@ func (c *booleanArrayAscendingCursor) Next() *tsdb.BooleanArray {
if pos < len(c.res.Timestamps) {
if c.tsm.pos < len(tvals.Timestamps) {
if pos == 0 {
// optimization: all points served from TSM data
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
// optimization: all points can be served from TSM data because
// we need the entire block and the block completely fits within
// the buffer.
copy(c.res.Timestamps, tvals.Timestamps)
pos += copy(c.res.Values, tvals.Values)
c.nextTSM()

View File

@ -104,8 +104,10 @@ func (c *{{$type}}) Next() {{$arrayType}} {
if pos < len(c.res.Timestamps) {
if c.tsm.pos < len(tvals.Timestamps) {
if pos == 0 {
// optimization: all points served from TSM data
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps){
// optimization: all points can be served from TSM data because
// we need the entire block and the block completely fits within
// the buffer.
copy(c.res.Timestamps, tvals.Timestamps)
pos += copy(c.res.Values, tvals.Values)
c.nextTSM()

View File

@ -6,10 +6,13 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/pkg/fs"
"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/assert"
)
type keyValues struct {
@ -133,3 +136,217 @@ func TestFileStore_DuplicatePoints(t *testing.T) {
}
})
}
// Int64Slice attaches the methods of Interface to []int64, sorting in increasing order.
type Int64Slice []int64
func (p Int64Slice) Len() int { return len(p) }
func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Verifies the array cursors correctly handle merged blocks from KeyCursor which may exceed the
// array cursor's local values buffer, which is initialized to MaxPointsPerBlock elements (1000)
//
// This test creates two TSM files which have a single block each. The second file
// has interleaving timestamps with the first file.
//
// The first file has a block of 800 timestamps starting at 1000 an increasing by 10ns
// The second file has a block of 400 timestamps starting at 1005, also increasing by 10ns
//
// When calling `nextTSM`, a single block of 1200 timestamps will be returned and the
// array cursor must chuck the values in the Next call.
func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64) []Value {
vals := make([]Value, count)
for i := range vals {
vals[i] = NewFloatValue(ts, 1.01)
ts += step
}
return vals
}
makeTs := func(ts, count, step int64) []int64 {
vals := make([]int64, count)
for i := range vals {
vals[i] = ts
ts += step
}
return vals
}
// Setup 2 files with the second containing a single block that is completely within the first
data := []keyValues{
{"m,_field=v#!~#v", makeVals(1000, 800, 10)},
{"m,_field=v#!~#v", makeVals(1005, 400, 10)},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
_ = fs.Replace(nil, files)
t.Run("ascending", func(t *testing.T) {
const START, END = 1000, 10000
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true)
defer kc.Close()
cur := newFloatArrayAscendingCursor()
cur.reset(START, END, nil, kc)
exp := makeTs(1000, 800, 10)
exp = append(exp, makeTs(1005, 400, 10)...)
sort.Sort(Int64Slice(exp))
// check first block
ar := cur.Next()
assert.Len(t, ar.Timestamps, 1000)
assert.Equal(t, exp[:1000], ar.Timestamps)
// check second block
exp = exp[1000:]
ar = cur.Next()
assert.Len(t, ar.Timestamps, 200)
assert.Equal(t, exp, ar.Timestamps)
})
t.Run("descending", func(t *testing.T) {
const START, END = 10000, 0
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
defer kc.Close()
cur := newFloatArrayDescendingCursor()
cur.reset(START, END, nil, kc)
exp := makeTs(1000, 800, 10)
exp = append(exp, makeTs(1005, 400, 10)...)
sort.Sort(sort.Reverse(Int64Slice(exp)))
// check first block
ar := cur.Next()
assert.Len(t, ar.Timestamps, 1000)
assert.Equal(t, exp[:1000], ar.Timestamps)
// check second block
exp = exp[1000:]
ar = cur.Next()
assert.Len(t, ar.Timestamps, 200)
assert.Equal(t, exp, ar.Timestamps)
})
}
// FloatArray attaches the methods of sort.Interface to *tsdb.FloatArray, sorting in increasing order.
type FloatArray struct {
*tsdb.FloatArray
}
func (a *FloatArray) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a *FloatArray) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
// Verifies the array cursors correctly handle merged blocks from KeyCursor which may exceed the
// array cursor's local values buffer, which is initialized to MaxPointsPerBlock elements (1000)
//
// This test creates two TSM files with a significant number of interleaved points in addition
// to a significant number of points in the second file which replace values in the first.
// To verify intersecting data from the second file replaces the first, the values differ,
// so the enumerated results can be compared with the expected output.
func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {
vals := make([]Value, count)
for i := range vals {
vals[i] = NewFloatValue(ts, v)
ts += step
}
return vals
}
makeArray := func(ts, count, step int64, v float64) *tsdb.FloatArray {
ar := tsdb.NewFloatArrayLen(int(count))
for i := range ar.Timestamps {
ar.Timestamps[i] = ts
ar.Values[i] = v
ts += step
}
return ar
}
// Setup 2 files with partially overlapping blocks and the second file replaces some elements of the first
data := []keyValues{
{"m,_field=v#!~#v", makeVals(1000, 3500, 10, 1.01)},
{"m,_field=v#!~#v", makeVals(4005, 3500, 5, 2.01)},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
_ = fs.Replace(nil, files)
t.Run("ascending", func(t *testing.T) {
const START, END = 1000, 1e9
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true)
defer kc.Close()
cur := newFloatArrayAscendingCursor()
cur.reset(START, END, nil, kc)
exp := makeArray(1000, 3500, 10, 1.01)
a2 := makeArray(4005, 3500, 5, 2.01)
exp.Merge(a2)
got := tsdb.NewFloatArrayLen(exp.Len())
got.Timestamps = got.Timestamps[:0]
got.Values = got.Values[:0]
ar := cur.Next()
for ar.Len() > 0 {
got.Timestamps = append(got.Timestamps, ar.Timestamps...)
got.Values = append(got.Values, ar.Values...)
ar = cur.Next()
}
assert.Len(t, got.Timestamps, exp.Len())
assert.Equal(t, got.Timestamps, exp.Timestamps)
assert.Equal(t, got.Values, exp.Values)
})
t.Run("descending", func(t *testing.T) {
const START, END = 1e9, 0
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
defer kc.Close()
cur := newFloatArrayDescendingCursor()
cur.reset(START, END, nil, kc)
exp := makeArray(1000, 3500, 10, 1.01)
a2 := makeArray(4005, 3500, 5, 2.01)
exp.Merge(a2)
sort.Sort(sort.Reverse(&FloatArray{exp}))
got := tsdb.NewFloatArrayLen(exp.Len())
got.Timestamps = got.Timestamps[:0]
got.Values = got.Values[:0]
ar := cur.Next()
for ar.Len() > 0 {
got.Timestamps = append(got.Timestamps, ar.Timestamps...)
got.Values = append(got.Values, ar.Values...)
ar = cur.Next()
}
assert.Len(t, got.Timestamps, exp.Len())
assert.Equal(t, got.Timestamps, exp.Timestamps)
assert.Equal(t, got.Values, exp.Values)
})
}