From 97504a552ca6fa408705cf30fc60b94e8a73053e Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 25 Apr 2016 22:13:58 -0600 Subject: [PATCH] Support time range tombstones in FileStore/KeyCursor --- tsdb/engine/tsm1/encoding.go | 65 ++++++++ tsdb/engine/tsm1/file_store.go | 94 ++++++++++-- tsdb/engine/tsm1/file_store_test.go | 220 ++++++++++++++++++++++++++++ tsdb/engine/tsm1/reader.go | 98 +++++++++++-- tsdb/engine/tsm1/reader_test.go | 170 +++++++++++++++++++++ tsdb/engine/tsm1/tombstone.go | 10 +- 6 files changed, 630 insertions(+), 27 deletions(-) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index c18df68147..d5e722ba94 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -86,6 +86,19 @@ func (a Values) Size() int { return sz } +func (a Values) Filter(min, max int64) Values { + var i int + for j := 0; j < len(a); j++ { + if a[j].UnixNano() >= min && a[j].UnixNano() <= max { + continue + } + + a[i] = a[j] + i++ + } + return a[:i] +} + // Encode converts the values to a byte slice. If there are no values, // this function panics. func (a Values) Encode(buf []byte) ([]byte, error) { @@ -354,6 +367,19 @@ func (a FloatValues) Deduplicate() FloatValues { return other } +func (a FloatValues) Filter(min, max int64) FloatValues { + var i int + for j := 0; j < len(a); j++ { + if a[j].UnixNano() >= min && a[j].UnixNano() <= max { + continue + } + + a[i] = a[j] + i++ + } + return a[:i] +} + // Sort methods func (a FloatValues) Len() int { return len(a) } func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -478,6 +504,19 @@ func (a BooleanValues) Deduplicate() BooleanValues { return other } +func (a BooleanValues) Filter(min, max int64) BooleanValues { + var i int + for j := 0; j < len(a); j++ { + if a[j].UnixNano() >= min && a[j].UnixNano() <= max { + continue + } + + a[i] = a[j] + i++ + } + return a[:i] +} + // Sort methods func (a BooleanValues) Len() int { return len(a) } func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -590,6 +629,19 @@ func (a IntegerValues) Deduplicate() IntegerValues { return other } +func (a IntegerValues) Filter(min, max int64) IntegerValues { + var i int + for j := 0; j < len(a); j++ { + if a[j].UnixNano() >= min && a[j].UnixNano() <= max { + continue + } + + a[i] = a[j] + i++ + } + return a[:i] +} + // Sort methods func (a IntegerValues) Len() int { return len(a) } func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -704,6 +756,19 @@ func (a StringValues) Deduplicate() StringValues { return other } +func (a StringValues) Filter(min, max int64) StringValues { + var i int + for j := 0; j < len(a); j++ { + if a[j].UnixNano() >= min && a[j].UnixNano() <= max { + continue + } + + a[i] = a[j] + i++ + } + return a[:i] +} + // Sort methods func (a StringValues) Len() int { return len(a) } func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 554a821918..93dc82026b 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "math" "os" "path/filepath" "sort" @@ -47,6 +48,9 @@ type TSMFile interface { // TimeRange returns the min and max time across all keys in the file. TimeRange() (int64, int64) + // TombstoneRange returns ranges of time that are deleted for the given key. + TombstoneRange(key string) []TimeRange + // KeyRange returns the min and max keys in the file. KeyRange() (string, string) @@ -67,6 +71,9 @@ type TSMFile interface { // Delete removes the keys from the set of keys available in this file. Delete(keys []string) error + // DeleteRange removes the values for keys between min and max. + DeleteRange(keys []string, min, max int64) error + // HasTombstones returns true if file contains values that have been deleted. HasTombstones() bool @@ -263,13 +270,18 @@ func (f *FileStore) Type(key string) (byte, error) { } func (f *FileStore) Delete(keys []string) error { + return f.DeleteRange(keys, math.MinInt64, math.MaxInt64) +} + +// DeleteRange removes the values for keys between min and max. +func (f *FileStore) DeleteRange(keys []string, min, max int64) error { f.mu.Lock() defer f.mu.Unlock() f.lastModified = time.Now() for _, file := range f.files { - if err := file.Delete(keys); err != nil { + if err := file.DeleteRange(keys, min, max); err != nil { return err } } @@ -530,6 +542,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []location { for _, fd := range filesSnapshot { minTime, maxTime := fd.TimeRange() + tombstones := fd.TombstoneRange(key) // If we ascending and the max time of the file is before where we want to start // skip it. if ascending && maxTime < t { @@ -544,6 +557,19 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []location { // the given key. fd.ReadEntries(key, &entries) for _, ie := range entries { + + // Skip any blocks only contain values that are tombstoned. + var skip bool + for _, t := range tombstones { + if t.Min <= ie.MinTime && t.Max >= ie.MaxTime { + skip = true + break + } + } + + if skip { + continue + } // If we ascending and the max time of a block is before where we are looking, skip // it since the data is out of our range if ascending && ie.MaxTime < t { @@ -804,15 +830,19 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, fdec *FloatDecoder, buf *[ values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, fdec, buf) first.read = true + tombstones := first.r.TombstoneRange(c.key) + // Only one block with this key and time range so return it if len(c.current) == 1 { - return values, err + return c.filterFloatValues(tombstones, values), err } // Otherwise, search the remaining blocks that overlap and append their values so we can // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] + tombstones := cur.r.TombstoneRange(c.key) + if c.ascending && !cur.read { cur.read = true c.pos++ @@ -822,7 +852,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, fdec *FloatDecoder, buf *[ if err != nil { return nil, err } - values = append(values, v...) + values = append(values, c.filterFloatValues(tombstones, v)...) } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -832,7 +862,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, fdec *FloatDecoder, buf *[ if err != nil { return nil, err } - values = append(v, values...) + values = append(c.filterFloatValues(tombstones, v), values...) } } @@ -852,15 +882,19 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf) first.read = true + tombstones := first.r.TombstoneRange(c.key) + // Only one block with this key and time range so return it if len(c.current) == 1 { - return values, err + return c.filterIntegerValues(tombstones, values), err } // Otherwise, search the remaining blocks that overlap and append their values so we can // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] + tombstones = cur.r.TombstoneRange(c.key) + if c.ascending && !cur.read { cur.read = true c.pos++ @@ -870,7 +904,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu if err != nil { return nil, err } - values = append(values, v...) + values = append(values, c.filterIntegerValues(tombstones, v)...) } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -880,7 +914,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu if err != nil { return nil, err } - values = append(v, values...) + values = append(c.filterIntegerValues(tombstones, v), values...) } } @@ -900,15 +934,18 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf) first.read = true + tombstones := first.r.TombstoneRange(c.key) + // Only one block with this key and time range so return it if len(c.current) == 1 { - return values, err + return c.filterStringValues(tombstones, values), err } // Otherwise, search the remaining blocks that overlap and append their values so we can // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] + tombstones = cur.r.TombstoneRange(c.key) if c.ascending && !cur.read { cur.read = true c.pos++ @@ -917,7 +954,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf if err != nil { return nil, err } - values = append(values, v...) + values = append(values, c.filterStringValues(tombstones, v)...) } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -927,7 +964,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf if err != nil { return nil, err } - values = append(v, values...) + values = append(c.filterStringValues(tombstones, v), values...) } } @@ -947,15 +984,18 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf) first.read = true + tombstones := first.r.TombstoneRange(c.key) + // Only one block with this key and time range so return it if len(c.current) == 1 { - return values, err + return c.filterBooleanValues(tombstones, values), err } // Otherwise, search the remaining blocks that overlap and append their values so we can // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] + tombstones = cur.r.TombstoneRange(c.key) if c.ascending && !cur.read { cur.read = true c.pos++ @@ -965,7 +1005,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu if err != nil { return nil, err } - values = append(values, v...) + values = append(values, c.filterBooleanValues(tombstones, v)...) } else if !c.ascending && !cur.read { cur.read = true c.pos-- @@ -975,13 +1015,41 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu if err != nil { return nil, err } - values = append(v, values...) + values = append(c.filterBooleanValues(tombstones, v), values...) } } return BooleanValues(values).Deduplicate(), err } +func (c *KeyCursor) filterFloatValues(tombstones []TimeRange, values FloatValues) FloatValues { + for _, t := range tombstones { + values = values.Filter(t.Min, t.Max) + } + return values +} + +func (c *KeyCursor) filterIntegerValues(tombstones []TimeRange, values IntegerValues) IntegerValues { + for _, t := range tombstones { + values = values.Filter(t.Min, t.Max) + } + return values +} + +func (c *KeyCursor) filterStringValues(tombstones []TimeRange, values StringValues) StringValues { + for _, t := range tombstones { + values = values.Filter(t.Min, t.Max) + } + return values +} + +func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanValues) BooleanValues { + for _, t := range tombstones { + values = values.Filter(t.Min, t.Max) + } + return values +} + type tsmReaders []TSMFile func (a tsmReaders) Len() int { return len(a) } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index b7fca7dffc..4de4a0e540 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -832,6 +832,226 @@ func TestFileStore_SeekToDesc_End(t *testing.T) { } } +func TestKeyCursor_TombstoneRange(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil { + t.Fatalf("unexpected error delete range: %v", err) + } + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor("cpu", 0, true) + expValues := []int{0, 2} + for _, v := range expValues { + values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + exp := data[v] + if got, exp := len(values), 1; got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[0].String(), exp.values[0].String(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp) + } + c.Next() + } +} + +func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{ + tsm1.NewValue(0, 1.0), + tsm1.NewValue(1, 2.0), + tsm1.NewValue(2, 3.0)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil { + t.Fatalf("unexpected error delete range: %v", err) + } + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + expValues := []tsm1.Value{data[0].values[0], data[0].values[2]} + for i, v := range expValues { + exp := v + if got, exp := len(values), 2; got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[i].String(), exp.String(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp) + } + } +} + +func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{ + tsm1.NewValue(0, int64(1)), + tsm1.NewValue(1, int64(2)), + tsm1.NewValue(2, int64(3))}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil { + t.Fatalf("unexpected error delete range: %v", err) + } + + buf := make([]tsm1.IntegerValue, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + expValues := []tsm1.Value{data[0].values[0], data[0].values[2]} + for i, v := range expValues { + exp := v + if got, exp := len(values), 2; got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[i].String(), exp.String(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp) + } + } +} + +func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{ + tsm1.NewValue(0, "1"), + tsm1.NewValue(1, "2"), + tsm1.NewValue(2, "3")}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil { + t.Fatalf("unexpected error delete range: %v", err) + } + + buf := make([]tsm1.StringValue, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + expValues := []tsm1.Value{data[0].values[0], data[0].values[2]} + for i, v := range expValues { + exp := v + if got, exp := len(values), 2; got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[i].String(), exp.String(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp) + } + } +} + +func TestKeyCursor_TombstoneRange_PartialBoolean(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{ + tsm1.NewValue(0, true), + tsm1.NewValue(1, false), + tsm1.NewValue(2, true)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Add(files...) + + if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil { + t.Fatalf("unexpected error delete range: %v", err) + } + + buf := make([]tsm1.BooleanValue, 1000) + c := fs.KeyCursor("cpu", 0, true) + values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + expValues := []tsm1.Value{data[0].values[0], data[0].values[2]} + for i, v := range expValues { + exp := v + if got, exp := len(values), 2; got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[i].String(), exp.String(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp) + } + } +} + func TestFileStore_Open(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 200e71d3fa..01b27fc766 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -37,6 +37,9 @@ type TSMIndex interface { // Delete removes the given keys from the index. Delete(keys []string) + // DeleteRange removes the given keys with data between minTime and maxTime from the index. + DeleteRange(keys []string, minTime, maxTime int64) + // Contains return true if the given key exists in the index. Contains(key string) bool @@ -73,6 +76,9 @@ type TSMIndex interface { // TimeRange returns the min and max time across all keys in the file. TimeRange() (int64, int64) + // TombstoneRange returns ranges of time that are deleted for the given key. + TombstoneRange(key string) []TimeRange + // KeyRange returns the min and max keys in the file. KeyRange() (string, string) @@ -147,7 +153,7 @@ func (b *BlockIterator) Read() (string, int64, int64, uint32, []byte, error) { // blockAccessor abstracts a method of accessing blocks from a // TSM file. type blockAccessor interface { - init() (TSMIndex, error) + init() (*indirectIndex, error) read(key string, timestamp int64) ([]Value, error) readAll(key string) ([]Value, error) readBlock(entry *IndexEntry, values []Value) ([]Value, error) @@ -320,6 +326,16 @@ func (t *TSMReader) ContainsValue(key string, ts int64) bool { return t.index.ContainsValue(key, ts) } +// DeleteRange removes the given points for keys between minTime and maxTime +func (t *TSMReader) DeleteRange(keys []string, minTime, maxTime int64) error { + if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil { + return err + } + + t.index.DeleteRange(keys, minTime, maxTime) + return nil +} + func (t *TSMReader) Delete(keys []string) error { if err := t.tombstoner.Add(keys); err != nil { return err @@ -381,6 +397,13 @@ func (t *TSMReader) TombstoneFiles() []FileStat { return t.tombstoner.TombstoneFiles() } +// TombstoneRange returns ranges of time that are deleted for the given key. +func (t *TSMReader) TombstoneRange(key string) []TimeRange { + t.mu.RLock() + defer t.mu.RUnlock() + return t.index.TombstoneRange(key) +} + func (t *TSMReader) Stats() FileStat { minTime, maxTime := t.index.TimeRange() minKey, maxKey := t.index.KeyRange() @@ -455,19 +478,21 @@ type indirectIndex struct { // minTime, maxTime are the minimum and maximum times contained in the file across all // series. minTime, maxTime int64 + + // tombstones contains only the tombstoned keys with subset of time values deleted. An + // entry would exist here if a subset of the points for a key were deleted and the file + // had not be re-compacted to remove the points on disk. + tombstones map[string][]TimeRange } -func NewIndirectIndex() TSMIndex { - return &indirectIndex{} +type TimeRange struct { + Min, Max int64 } -// Add records a new block entry for a key in the index. -func (d *indirectIndex) Add(key string, blockType byte, minTime, maxTime int64, offset int64, size uint32) { - panic("unsupported operation") -} - -func (d *indirectIndex) Write(w io.Writer) error { - panic("unsupported operation") +func NewIndirectIndex() *indirectIndex { + return &indirectIndex{ + tombstones: make(map[string][]TimeRange), + } } // search returns the index of i in offsets for where key is located. If key is not @@ -629,6 +654,34 @@ func (d *indirectIndex) Delete(keys []string) { d.offsets = offsets } +func (d *indirectIndex) DeleteRange(keys []string, minTime, maxTime int64) { + // No keys, nothing to do + if len(keys) == 0 { + return + } + + // If we're deleting the max time range, just use tombstoning to remove the + // key from the offsets slice + if minTime == math.MinInt64 && maxTime == math.MaxInt64 { + d.Delete(keys) + return + } + + d.mu.Lock() + defer d.mu.Unlock() + + for _, k := range keys { + d.tombstones[k] = append(d.tombstones[k], TimeRange{minTime, maxTime}) + } +} + +func (d *indirectIndex) TombstoneRange(key string) []TimeRange { + d.mu.RLock() + r := d.tombstones[key] + d.mu.RUnlock() + return r +} + func (d *indirectIndex) Contains(key string) bool { return len(d.Entries(key)) > 0 } @@ -751,10 +804,10 @@ type mmapAccessor struct { f *os.File b []byte - index TSMIndex + index *indirectIndex } -func (m *mmapAccessor) init() (TSMIndex, error) { +func (m *mmapAccessor) init() (*indirectIndex, error) { m.mu.Lock() defer m.mu.Unlock() @@ -822,6 +875,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec if int64(len(m.b)) < entry.Offset+int64(entry.Size) { return nil, ErrTSMClosed } + //TODO: Validate checksum a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values) if err != nil { @@ -898,6 +952,8 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) { return nil, nil } + tombstones := m.index.TombstoneRange(key) + m.mu.RLock() defer m.mu.RUnlock() @@ -905,6 +961,18 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) { var err error var values []Value for _, block := range blocks { + var skip bool + for _, t := range tombstones { + // Should we skip this block because it contains points that have been deleted + if t.Min <= block.MinTime && t.Max >= block.MaxTime { + skip = true + break + } + } + + if skip { + continue + } //TODO: Validate checksum temp = temp[:0] // The +4 is the 4 byte checksum length @@ -912,6 +980,12 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) { if err != nil { return nil, err } + + // Filter out any values that were deleted + for _, t := range tombstones { + temp = Values(temp).Filter(t.Min, t.Max) + } + values = append(values, temp...) } diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index fa66697873..3f8e8e9091 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -2,6 +2,7 @@ package tsm1_test import ( "fmt" + "math" "os" "testing" @@ -336,6 +337,175 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) { } } +func TestTSMReader_MMAP_TombstoneRange(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + f := MustTempFile(dir) + defer f.Close() + + w, err := tsm1.NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + expValues := []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + } + if err := w.Write("cpu", expValues); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error writing index: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + f, err = os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error open file: %v", err) + } + + r, err := tsm1.NewTSMReader(f) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + if err := r.DeleteRange([]string{"cpu"}, 2, math.MaxInt64); err != nil { + t.Fatalf("unexpected error deleting: %v", err) + } + defer r.Close() + + values, err := r.ReadAll("cpu") + if err != nil { + t.Fatalf("unexpected error reading all: %v", err) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[0].String(), expValues[0].String(); got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } +} + +func TestTSMReader_MMAP_TombstoneFullRange(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + f := MustTempFile(dir) + defer f.Close() + + w, err := tsm1.NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + expValues := []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + } + if err := w.Write("cpu", expValues); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error writing index: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + f, err = os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error open file: %v", err) + } + + r, err := tsm1.NewTSMReader(f) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + if err := r.DeleteRange([]string{"cpu"}, math.MinInt64, math.MaxInt64); err != nil { + t.Fatalf("unexpected error deleting: %v", err) + } + defer r.Close() + + values, err := r.ReadAll("cpu") + if err != nil { + t.Fatalf("unexpected error reading all: %v", err) + } + + if got, exp := len(values), 0; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } +} + +func TestTSMReader_MMAP_TombstoneMultipleRanges(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + f := MustTempFile(dir) + defer f.Close() + + w, err := tsm1.NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + expValues := []tsm1.Value{ + tsm1.NewValue(1, 1.0), + tsm1.NewValue(2, 2.0), + tsm1.NewValue(3, 3.0), + tsm1.NewValue(4, 4.0), + tsm1.NewValue(5, 5.0), + } + if err := w.Write("cpu", expValues); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error writing index: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + f, err = os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error open file: %v", err) + } + + r, err := tsm1.NewTSMReader(f) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + defer r.Close() + + if err := r.DeleteRange([]string{"cpu"}, 2, 2); err != nil { + t.Fatalf("unexpected error deleting: %v", err) + } + + if err := r.DeleteRange([]string{"cpu"}, 4, 4); err != nil { + t.Fatalf("unexpected error deleting: %v", err) + } + + values, err := r.ReadAll("cpu") + if err != nil { + t.Fatalf("unexpected error reading all: %v", err) + } + + if got, exp := len(values), 3; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } +} + func TestTSMReader_MMAP_Stats(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) diff --git a/tsdb/engine/tsm1/tombstone.go b/tsdb/engine/tsm1/tombstone.go index f4a9642f3f..6cb345148f 100644 --- a/tsdb/engine/tsm1/tombstone.go +++ b/tsdb/engine/tsm1/tombstone.go @@ -29,7 +29,13 @@ type Tombstone struct { Min, Max int64 } +// Add add the all keys to the tombstone func (t *Tombstoner) Add(keys []string) error { + return t.AddRange(keys, math.MinInt64, math.MaxInt64) +} + +// AddRange adds all keys to the tombstone specifying only the data between min and max to be removed. +func (t *Tombstoner) AddRange(keys []string, min, max int64) error { t.mu.Lock() defer t.mu.Unlock() @@ -47,8 +53,8 @@ func (t *Tombstoner) Add(keys []string) error { for _, k := range keys { tombstones = append(tombstones, Tombstone{ Key: k, - Min: math.MinInt64, - Max: math.MaxInt64, + Min: min, + Max: max, }) }