From 9984cd5d6dd51216510d8d8d20018507fcb92882 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 11 Mar 2016 16:32:16 -0700 Subject: [PATCH] Fix skipping blocks at query time when overlaps exist Depending on how data is written across TSM files, it was possible to skip over some blocks at query time making it looks like data was missing. --- tsdb/engine/tsm1/file_store.go | 16 +- tsdb/engine/tsm1/file_store_test.go | 409 +++++++++++++++++++++++++--- 2 files changed, 385 insertions(+), 40 deletions(-) diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 612ed56c92..d8ba93b52b 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -772,7 +772,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) { // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadFloatBlockAt(cur.entry, nil) @@ -780,7 +780,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) { return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -816,7 +816,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error) // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadIntegerBlockAt(cur.entry, nil) @@ -824,7 +824,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error) return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -860,7 +860,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) { // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadStringBlockAt(cur.entry, nil) @@ -868,7 +868,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) { return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -904,7 +904,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error) // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadBooleanBlockAt(cur.entry, nil) @@ -912,7 +912,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error) return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 843b2ec2a3..26bd94480c 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -109,12 +109,15 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { t.Fatalf("unexpected error reading values: %v", err) } - exp := data[1] - if got, exp := len(values), len(exp.values); got != exp { + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + } + if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) } - for i, v := range exp.values { + for i, v := range exp { if got, exp := values[i].Value(), v.Value(); got != exp { t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) } @@ -126,15 +129,10 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { if err != nil { t.Fatalf("unexpected error reading values: %v", err) } - exp = data[3] - if got, exp := len(values), len(exp.values); got != exp { - t.Fatalf("value length mismatch: got %v, exp %v", got, exp) - } - for i, v := range exp.values { - if got, exp := values[i].Value(), v.Value(); got != exp { - t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) - } + exp = nil + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) } } @@ -170,7 +168,191 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) { for i, v := range exp.values { if got, exp := values[i].Value(), v.Value(); got != exp { - t.Fatalf("read value mismatch(%d): got %v, exp %d", i, got, exp) + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 0.0), tsm1.NewValue(1, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 4.0), tsm1.NewValue(7, 7.0)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.FloatValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadFloatBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(0)), tsm1.NewValue(1, int64(1))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(4)), tsm1.NewValue(7, int64(7))}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.IntegerValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadIntegerBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, true), tsm1.NewValue(1, false)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, false), tsm1.NewValue(7, true)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.BooleanValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadBooleanBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "zero"), tsm1.NewValue(1, "one")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "four"), tsm1.NewValue(7, "seven")}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.StringValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadStringBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) } } } @@ -310,34 +492,21 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { if err != nil { t.Fatalf("unexpected error reading values: %v", err) } - exp := data[3] - if got, exp := len(values), len(exp.values); got != exp { + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + } + if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) } - for i, v := range exp.values { - if got, exp := values[i].Value(), v.Value(); got != exp { - t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) - } - } - - // Check that calling Next will dedupe points - c.Next() - values, err = c.ReadFloatBlock(buf) - if err != nil { - t.Fatalf("unexpected error reading values: %v", err) - } - exp = data[1] - if got, exp := len(values), len(exp.values); got != exp { - t.Fatalf("value length mismatch: got %v, exp %v", got, exp) - } - - for i, v := range exp.values { + for i, v := range exp { if got, exp := values[i].Value(), v.Value(); got != exp { t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) } } } + func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { fs := tsm1.NewFileStore("") @@ -374,6 +543,182 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { } } +func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, 0.0), tsm1.NewValue(9, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 4.0), tsm1.NewValue(7, 7.0)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.FloatValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadFloatBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, int64(0)), tsm1.NewValue(9, int64(1))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(4)), tsm1.NewValue(7, int64(7))}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.IntegerValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadIntegerBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, true), tsm1.NewValue(9, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, false)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true), tsm1.NewValue(7, false)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.BooleanValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadBooleanBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, "eight"), tsm1.NewValue(9, "nine")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "four"), tsm1.NewValue(7, "seven")}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.StringValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadStringBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + func TestFileStore_SeekToDesc_Middle(t *testing.T) { fs := tsm1.NewFileStore("")