From 87892d79da75d3c8edc0a68e6130e52f066e9ece Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 7 Dec 2015 20:57:08 -0700 Subject: [PATCH] Dedupe points at query time if there are overlapping blocks --- tsdb/engine/tsm1/data_file.go | 9 ++ tsdb/engine/tsm1/file_store.go | 152 ++++++++++++++++++++++++---- tsdb/engine/tsm1/file_store_test.go | 104 +++++++++++++++++++ 3 files changed, 246 insertions(+), 19 deletions(-) diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index 1e65b409bb..d3b7a176cd 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -208,6 +208,15 @@ func (e *IndexEntry) Contains(t time.Time) bool { (e.MaxTime.Equal(t) || e.MaxTime.After(t)) } +func (e *IndexEntry) OverlapsTimeRange(min, max time.Time) bool { + return (e.MinTime.Equal(max) || e.MinTime.Before(max)) && + (e.MaxTime.Equal(min) || e.MaxTime.After(min)) +} + +func (e *IndexEntry) String() string { + return fmt.Sprintf("min=%s max=%s ofs=%d siz=%d", e.MinTime.UTC(), e.MaxTime.UTC(), e.Offset, e.Size) +} + func NewDirectIndex() TSMIndex { return &directIndex{ blocks: map[string]*indexEntries{}, diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index f4a53c5328..a65e3bd3a8 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -443,14 +443,31 @@ func ParseTSMFileName(name string) (int, int, error) { } type KeyCursor struct { - key string - fs *FileStore - seeks []*location - current *location - buf []Value + key string + fs *FileStore + + // seeks is all the file locations that we need to return during iteration. + seeks []*location + + // current is the set of blocks possibly containing the next set of points. + // Normally this is just one entry, but there may be multiple if points have + // been overwritten. + current []*location + buf []Value + + // pos is the index within seeks. Based on ascending, it will increment or + // decrement through the size of seeks slice. pos int ascending bool - ready bool + + // ready indicates that we know the files and blocks to seek to for the key. + ready bool + + // duplicates is a hint that there are overlapping blocks for this key in + // multiple files (e.g. points have been overwritten but not fully compacted) + // If this is true, we need to scan the duplicate blocks and dedup the points + // as query time until they are compacted. + duplicates bool } type location struct { @@ -462,8 +479,20 @@ func (c *KeyCursor) init(t time.Time, ascending bool) { if c.ready { return } - + c.ascending = ascending c.seeks = c.fs.locations(c.key, t, ascending) + + if len(c.seeks) > 0 { + for i := 1; i < len(c.seeks); i++ { + prev := c.seeks[i-1] + cur := c.seeks[i] + + if prev.entry.MaxTime.Equal(cur.entry.MinTime) || prev.entry.MaxTime.After(cur.entry.MinTime) { + c.duplicates = true + break + } + } + } c.buf = make([]Value, 1000) c.ready = true } @@ -478,42 +507,127 @@ func (c *KeyCursor) SeekTo(t time.Time, ascending bool) ([]Value, error) { if ascending { for i, e := range c.seeks { if t.Before(e.entry.MinTime) || e.entry.Contains(t) { - c.current = e - c.pos = i - break + // Record the position of the first block matching our seek time + if len(c.current) == 0 { + c.pos = i + } + + c.current = append(c.current, e) + + // If we don't have duplicates, break. Otherwise, keep looking for additional blocks containing + // this point. + if !c.duplicates { + break + } } } } else { for i := len(c.seeks) - 1; i >= 0; i-- { e := c.seeks[i] if t.After(e.entry.MaxTime) || e.entry.Contains(t) { - c.current = e - c.pos = i - break + // Record the position of the first block matching our seek time + if len(c.current) == 0 { + c.pos = i + } + + c.current = append(c.current, e) + + // If we don't have duplicates, break. Otherwise, keep looking for additional blocks containing + // this point. + if !c.duplicates { + break + } } } } - if c.current == nil { + return c.readAt() +} + +func (c *KeyCursor) readAt() ([]Value, error) { + // No matching blocks to decode + if len(c.current) == 0 { return nil, nil } - return c.current.r.ReadAt(c.current.entry, c.buf[:0]) + + // First block is the oldest block containing the points we're search for. + first := c.current[0] + values, err := first.r.ReadAt(first.entry, c.buf[:0]) + + // Only one block with this key and time range so return it + if len(c.current) == 1 { + return values, err + } + + // Otherwise, search the remaining blocks that overlap and append their values so we can + // dedup them. + for i := 1; i < len(c.current); i++ { + cur := c.current[i] + if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) { + c.pos++ + v, err := cur.r.ReadAt(cur.entry, nil) + if err != nil { + return nil, err + } + values = append(values, v...) + + } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) { + c.pos-- + + v, err := cur.r.ReadAt(cur.entry, nil) + if err != nil { + return nil, err + } + values = append(v, values...) + } + } + + return Values(values).Deduplicate(), err } func (c *KeyCursor) Next(ascending bool) ([]Value, error) { + c.current = c.current[:0] + if ascending { c.pos++ if c.pos >= len(c.seeks) { return nil, nil } - c.current = c.seeks[c.pos] - return c.current.r.ReadAt(c.current.entry, c.buf[:0]) + + // Append the first matching block + c.current = []*location{c.seeks[c.pos]} + + // If we have ovelapping blocks, append all their values so we can dedup + if c.duplicates { + first := c.seeks[c.pos] + for i := c.pos; i < len(c.seeks); i++ { + if c.seeks[i].entry.MinTime.Before(first.entry.MaxTime) || c.seeks[i].entry.MinTime.Equal(first.entry.MaxTime) { + c.current = append(c.current, c.seeks[i]) + } + } + } + + return c.readAt() + } else { c.pos-- if c.pos < 0 { return nil, nil } - c.current = c.seeks[c.pos] - return c.current.r.ReadAt(c.current.entry, c.buf[:0]) + + // Append the first matching block + c.current = []*location{c.seeks[c.pos]} + + // If we have ovelapping blocks, append all their values so we can dedup + if c.duplicates { + first := c.seeks[c.pos] + for i := c.pos; i >= 0; i-- { + if c.seeks[i].entry.MaxTime.After(first.entry.MinTime) || c.seeks[i].entry.MaxTime.Equal(first.entry.MinTime) { + c.current = append(c.current, c.seeks[i]) + } + } + } + + return c.readAt() } } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 17474abe22..7f0586a14f 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -83,6 +83,59 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) { } } +func TestFileStore_SeekToAsc_Duplicate(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 3.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 4.0)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + c := fs.KeyCursor("cpu") + // Search for an entry that exists in the second file + values, err := c.SeekTo(time.Unix(0, 0), true) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := data[1] + if got, exp := len(values), len(exp.values); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp.values { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } + + // Check that calling Next will dedupe points + values, err = c.Next(true) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + exp = data[3] + if got, exp := len(values), len(exp.values); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp.values { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } + +} func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) { fs := tsm1.NewFileStore("") @@ -226,6 +279,57 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) { } } +func TestFileStore_SeekToDesc_Duplicate(t *testing.T) { + fs := tsm1.NewFileStore("") + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 4.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(2, 0), 3.0)}}, + } + + files, err := newFiles(data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + // Search for an entry that exists in the second file + c := fs.KeyCursor("cpu") + values, err := c.SeekTo(time.Unix(2, 0), false) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + exp := data[3] + if got, exp := len(values), len(exp.values); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp.values { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } + + // Check that calling Next will dedupe points + values, err = c.Next(false) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + exp = data[1] + if got, exp := len(values), len(exp.values); got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + for i, v := range exp.values { + if got, exp := values[i].Value(), v.Value(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp) + } + } +} func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) { fs := tsm1.NewFileStore("")