Merge pull request #5996 from influxdata/jw-dup-block

Fix skipping blocks at query time when overlaps exist
pull/6009/head
Jason Wilder 2016-03-14 13:36:27 -06:00
commit 667ada3906
2 changed files with 385 additions and 40 deletions

View File

@ -772,7 +772,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) {
// dedup them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadFloatBlockAt(cur.entry, nil)
@ -780,7 +780,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) {
return nil, err
}
values = append(values, v...)
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
} else if !c.ascending && !cur.read {
cur.read = true
c.pos--
@ -816,7 +816,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error)
// dedup them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadIntegerBlockAt(cur.entry, nil)
@ -824,7 +824,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error)
return nil, err
}
values = append(values, v...)
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
} else if !c.ascending && !cur.read {
cur.read = true
c.pos--
@ -860,7 +860,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) {
// dedup them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadStringBlockAt(cur.entry, nil)
@ -868,7 +868,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) {
return nil, err
}
values = append(values, v...)
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
} else if !c.ascending && !cur.read {
cur.read = true
c.pos--
@ -904,7 +904,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error)
// dedup them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadBooleanBlockAt(cur.entry, nil)
@ -912,7 +912,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error)
return nil, err
}
values = append(values, v...)
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
} else if !c.ascending && !cur.read {
cur.read = true
c.pos--

View File

@ -109,12 +109,15 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[1]
if got, exp := len(values), len(exp.values); got != exp {
exp := []tsm1.Value{
data[1].values[0],
data[3].values[0],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
@ -126,15 +129,10 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = data[3]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
exp = nil
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
}
@ -170,7 +168,191 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, got, exp)
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 0.0), tsm1.NewValue(1, 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 4.0), tsm1.NewValue(7, 7.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
buf := make(tsm1.FloatValues, 1000)
c := fs.KeyCursor("cpu", 0, true)
values, err := c.ReadFloatBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[1],
data[1].values[0],
data[2].values[0],
data[3].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(0)), tsm1.NewValue(1, int64(1))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(4)), tsm1.NewValue(7, int64(7))}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
buf := make(tsm1.IntegerValues, 1000)
c := fs.KeyCursor("cpu", 0, true)
values, err := c.ReadIntegerBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[1],
data[1].values[0],
data[2].values[0],
data[3].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, true), tsm1.NewValue(1, false)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, false), tsm1.NewValue(7, true)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
buf := make(tsm1.BooleanValues, 1000)
c := fs.KeyCursor("cpu", 0, true)
values, err := c.ReadBooleanBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[1],
data[1].values[0],
data[2].values[0],
data[3].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "zero"), tsm1.NewValue(1, "one")}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "four"), tsm1.NewValue(7, "seven")}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Search for an entry that exists in the second file
buf := make(tsm1.StringValues, 1000)
c := fs.KeyCursor("cpu", 0, true)
values, err := c.ReadStringBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[1],
data[1].values[0],
data[2].values[0],
data[3].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
@ -310,34 +492,21 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := data[3]
if got, exp := len(values), len(exp.values); got != exp {
exp := []tsm1.Value{
data[1].values[0],
data[3].values[0],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
// Check that calling Next will dedupe points
c.Next()
values, err = c.ReadFloatBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = data[1]
if got, exp := len(values), len(exp.values); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp.values {
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
fs := tsm1.NewFileStore("")
@ -374,6 +543,182 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
}
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, 0.0), tsm1.NewValue(9, 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 4.0), tsm1.NewValue(7, 7.0)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
buf := make(tsm1.FloatValues, 1000)
c := fs.KeyCursor("cpu", 8, false)
values, err := c.ReadFloatBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[1].values[0],
data[3].values[0],
data[3].values[1],
data[0].values[0],
data[0].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, int64(0)), tsm1.NewValue(9, int64(1))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(4)), tsm1.NewValue(7, int64(7))}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
buf := make(tsm1.IntegerValues, 1000)
c := fs.KeyCursor("cpu", 8, false)
values, err := c.ReadIntegerBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[1].values[0],
data[3].values[0],
data[3].values[1],
data[0].values[0],
data[0].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, true), tsm1.NewValue(9, true)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, false)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true), tsm1.NewValue(7, false)}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
buf := make(tsm1.BooleanValues, 1000)
c := fs.KeyCursor("cpu", 8, false)
values, err := c.ReadBooleanBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[1].values[0],
data[3].values[0],
data[3].values[1],
data[0].values[0],
data[0].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
fs := tsm1.NewFileStore("")
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, "eight"), tsm1.NewValue(9, "nine")}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "four"), tsm1.NewValue(7, "seven")}},
}
files, err := newFiles(data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
buf := make(tsm1.StringValues, 1000)
c := fs.KeyCursor("cpu", 8, false)
values, err := c.ReadStringBlock(buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[1].values[0],
data[3].values[0],
data[3].values[1],
data[0].values[0],
data[0].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_Middle(t *testing.T) {
fs := tsm1.NewFileStore("")