diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 612ed56c92..d8ba93b52b 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -772,7 +772,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) { // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadFloatBlockAt(cur.entry, nil) @@ -780,7 +780,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) { return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -816,7 +816,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error) // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadIntegerBlockAt(cur.entry, nil) @@ -824,7 +824,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error) return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -860,7 +860,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) { // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadStringBlockAt(cur.entry, nil) @@ -868,7 +868,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) { return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -904,7 +904,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error) // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + if c.ascending && !cur.read { cur.read = true c.pos++ v, err := cur.r.ReadBooleanBlockAt(cur.entry, nil) @@ -912,7 +912,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error) return nil, err } values = append(values, v...) - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + } else if !c.ascending && !cur.read { cur.read = true c.pos-- diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 843b2ec2a3..26bd94480c 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -109,12 +109,15 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { t.Fatalf("unexpected error reading values: %v", err) } - exp := data[1] - if got, exp := len(values), len(exp.values); got != exp { + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + } + if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) } - for i, v := range exp.values { + for i, v := range exp { if got, exp := values[i].Value(), v.Value(); got != exp { t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) } @@ -126,15 +129,10 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { if err != nil { t.Fatalf("unexpected error reading values: %v", err) } - exp = data[3] - if got, exp := len(values), len(exp.values); got != exp { - t.Fatalf("value length mismatch: got %v, exp %v", got, exp) - } - for i, v := range exp.values { - if got, exp := values[i].Value(), v.Value(); got != exp { - t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) - } + exp = nil + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) } } @@ -170,7 +168,191 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) { for i, v := range exp.values { if got, exp := values[i].Value(), v.Value(); got != exp { - t.Fatalf("read value mismatch(%d): got %v, exp %d", i, got, exp) + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 0.0), tsm1.NewValue(1, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 4.0), tsm1.NewValue(7, 7.0)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.FloatValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadFloatBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(0)), tsm1.NewValue(1, int64(1))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(4)), tsm1.NewValue(7, int64(7))}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.IntegerValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadIntegerBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, true), tsm1.NewValue(1, false)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, false), tsm1.NewValue(7, true)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.BooleanValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadBooleanBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +// Tests that seeking and reading all blocks that contain overlapping points does +// not skip any blocks. +func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "zero"), tsm1.NewValue(1, "one")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "four"), tsm1.NewValue(7, "seven")}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + buf := make(tsm1.StringValues, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadStringBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[3].values[0], + data[0].values[1], + data[1].values[0], + data[2].values[0], + data[3].values[1], + } + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) } } } @@ -310,34 +492,21 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { if err != nil { t.Fatalf("unexpected error reading values: %v", err) } - exp := data[3] - if got, exp := len(values), len(exp.values); got != exp { + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + } + if got, exp := len(values), len(exp); got != exp { t.Fatalf("value length mismatch: got %v, exp %v", got, exp) } - for i, v := range exp.values { - if got, exp := values[i].Value(), v.Value(); got != exp { - t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) - } - } - - // Check that calling Next will dedupe points - c.Next() - values, err = c.ReadFloatBlock(buf) - if err != nil { - t.Fatalf("unexpected error reading values: %v", err) - } - exp = data[1] - if got, exp := len(values), len(exp.values); got != exp { - t.Fatalf("value length mismatch: got %v, exp %v", got, exp) - } - - for i, v := range exp.values { + for i, v := range exp { if got, exp := values[i].Value(), v.Value(); got != exp { t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) } } } + func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { fs := tsm1.NewFileStore("") @@ -374,6 +543,182 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { } } +func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, 0.0), tsm1.NewValue(9, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 4.0), tsm1.NewValue(7, 7.0)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.FloatValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadFloatBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, int64(0)), tsm1.NewValue(9, int64(1))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(4)), tsm1.NewValue(7, int64(7))}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.IntegerValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadIntegerBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, true), tsm1.NewValue(9, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, false)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true), tsm1.NewValue(7, false)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.BooleanValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadBooleanBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + +func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, "eight"), tsm1.NewValue(9, "nine")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "four"), tsm1.NewValue(7, "seven")}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + buf := make(tsm1.StringValues, 1000) + c := fs.KeyCursor("cpu", 8, false) + values, err := c.ReadStringBlock(buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := []tsm1.Value{ + data[1].values[0], + data[3].values[0], + data[3].values[1], + data[0].values[0], + data[0].values[1], + } + + if got, exp := len(values), len(exp); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} + func TestFileStore_SeekToDesc_Middle(t *testing.T) { fs := tsm1.NewFileStore("")