Support time range tombstones in FileStore/KeyCursor
parent
27c2bc3f15
commit
97504a552c
|
@ -86,6 +86,19 @@ func (a Values) Size() int {
|
|||
return sz
|
||||
}
|
||||
|
||||
func (a Values) Filter(min, max int64) Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Encode converts the values to a byte slice. If there are no values,
|
||||
// this function panics.
|
||||
func (a Values) Encode(buf []byte) ([]byte, error) {
|
||||
|
@ -354,6 +367,19 @@ func (a FloatValues) Deduplicate() FloatValues {
|
|||
return other
|
||||
}
|
||||
|
||||
func (a FloatValues) Filter(min, max int64) FloatValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a FloatValues) Len() int { return len(a) }
|
||||
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
@ -478,6 +504,19 @@ func (a BooleanValues) Deduplicate() BooleanValues {
|
|||
return other
|
||||
}
|
||||
|
||||
func (a BooleanValues) Filter(min, max int64) BooleanValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a BooleanValues) Len() int { return len(a) }
|
||||
func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
@ -590,6 +629,19 @@ func (a IntegerValues) Deduplicate() IntegerValues {
|
|||
return other
|
||||
}
|
||||
|
||||
func (a IntegerValues) Filter(min, max int64) IntegerValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a IntegerValues) Len() int { return len(a) }
|
||||
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
@ -704,6 +756,19 @@ func (a StringValues) Deduplicate() StringValues {
|
|||
return other
|
||||
}
|
||||
|
||||
func (a StringValues) Filter(min, max int64) StringValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a StringValues) Len() int { return len(a) }
|
||||
func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
@ -47,6 +48,9 @@ type TSMFile interface {
|
|||
// TimeRange returns the min and max time across all keys in the file.
|
||||
TimeRange() (int64, int64)
|
||||
|
||||
// TombstoneRange returns ranges of time that are deleted for the given key.
|
||||
TombstoneRange(key string) []TimeRange
|
||||
|
||||
// KeyRange returns the min and max keys in the file.
|
||||
KeyRange() (string, string)
|
||||
|
||||
|
@ -67,6 +71,9 @@ type TSMFile interface {
|
|||
// Delete removes the keys from the set of keys available in this file.
|
||||
Delete(keys []string) error
|
||||
|
||||
// DeleteRange removes the values for keys between min and max.
|
||||
DeleteRange(keys []string, min, max int64) error
|
||||
|
||||
// HasTombstones returns true if file contains values that have been deleted.
|
||||
HasTombstones() bool
|
||||
|
||||
|
@ -263,13 +270,18 @@ func (f *FileStore) Type(key string) (byte, error) {
|
|||
}
|
||||
|
||||
func (f *FileStore) Delete(keys []string) error {
|
||||
return f.DeleteRange(keys, math.MinInt64, math.MaxInt64)
|
||||
}
|
||||
|
||||
// DeleteRange removes the values for keys between min and max.
|
||||
func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
f.lastModified = time.Now()
|
||||
|
||||
for _, file := range f.files {
|
||||
if err := file.Delete(keys); err != nil {
|
||||
if err := file.DeleteRange(keys, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -530,6 +542,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []location {
|
|||
for _, fd := range filesSnapshot {
|
||||
minTime, maxTime := fd.TimeRange()
|
||||
|
||||
tombstones := fd.TombstoneRange(key)
|
||||
// If we ascending and the max time of the file is before where we want to start
|
||||
// skip it.
|
||||
if ascending && maxTime < t {
|
||||
|
@ -544,6 +557,19 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []location {
|
|||
// the given key.
|
||||
fd.ReadEntries(key, &entries)
|
||||
for _, ie := range entries {
|
||||
|
||||
// Skip any blocks only contain values that are tombstoned.
|
||||
var skip bool
|
||||
for _, t := range tombstones {
|
||||
if t.Min <= ie.MinTime && t.Max >= ie.MaxTime {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
// If we ascending and the max time of a block is before where we are looking, skip
|
||||
// it since the data is out of our range
|
||||
if ascending && ie.MaxTime < t {
|
||||
|
@ -804,15 +830,19 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, fdec *FloatDecoder, buf *[
|
|||
values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, fdec, buf)
|
||||
first.read = true
|
||||
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, err
|
||||
return c.filterFloatValues(tombstones, values), err
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
if c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos++
|
||||
|
@ -822,7 +852,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, fdec *FloatDecoder, buf *[
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(values, v...)
|
||||
values = append(values, c.filterFloatValues(tombstones, v)...)
|
||||
} else if !c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos--
|
||||
|
@ -832,7 +862,7 @@ func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, fdec *FloatDecoder, buf *[
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(v, values...)
|
||||
values = append(c.filterFloatValues(tombstones, v), values...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -852,15 +882,19 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf)
|
||||
first.read = true
|
||||
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, err
|
||||
return c.filterIntegerValues(tombstones, values), err
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones = cur.r.TombstoneRange(c.key)
|
||||
|
||||
if c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos++
|
||||
|
@ -870,7 +904,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(values, v...)
|
||||
values = append(values, c.filterIntegerValues(tombstones, v)...)
|
||||
} else if !c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos--
|
||||
|
@ -880,7 +914,7 @@ func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, bu
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(v, values...)
|
||||
values = append(c.filterIntegerValues(tombstones, v), values...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -900,15 +934,18 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf)
|
||||
first.read = true
|
||||
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, err
|
||||
return c.filterStringValues(tombstones, values), err
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones = cur.r.TombstoneRange(c.key)
|
||||
if c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos++
|
||||
|
@ -917,7 +954,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(values, v...)
|
||||
values = append(values, c.filterStringValues(tombstones, v)...)
|
||||
} else if !c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos--
|
||||
|
@ -927,7 +964,7 @@ func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(v, values...)
|
||||
values = append(c.filterStringValues(tombstones, v), values...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -947,15 +984,18 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu
|
|||
values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf)
|
||||
first.read = true
|
||||
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, err
|
||||
return c.filterBooleanValues(tombstones, values), err
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones = cur.r.TombstoneRange(c.key)
|
||||
if c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos++
|
||||
|
@ -965,7 +1005,7 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(values, v...)
|
||||
values = append(values, c.filterBooleanValues(tombstones, v)...)
|
||||
} else if !c.ascending && !cur.read {
|
||||
cur.read = true
|
||||
c.pos--
|
||||
|
@ -975,13 +1015,41 @@ func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, bu
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(v, values...)
|
||||
values = append(c.filterBooleanValues(tombstones, v), values...)
|
||||
}
|
||||
}
|
||||
|
||||
return BooleanValues(values).Deduplicate(), err
|
||||
}
|
||||
|
||||
func (c *KeyCursor) filterFloatValues(tombstones []TimeRange, values FloatValues) FloatValues {
|
||||
for _, t := range tombstones {
|
||||
values = values.Filter(t.Min, t.Max)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func (c *KeyCursor) filterIntegerValues(tombstones []TimeRange, values IntegerValues) IntegerValues {
|
||||
for _, t := range tombstones {
|
||||
values = values.Filter(t.Min, t.Max)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func (c *KeyCursor) filterStringValues(tombstones []TimeRange, values StringValues) StringValues {
|
||||
for _, t := range tombstones {
|
||||
values = values.Filter(t.Min, t.Max)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanValues) BooleanValues {
|
||||
for _, t := range tombstones {
|
||||
values = values.Filter(t.Min, t.Max)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
type tsmReaders []TSMFile
|
||||
|
||||
func (a tsmReaders) Len() int { return len(a) }
|
||||
|
|
|
@ -832,6 +832,226 @@ func TestFileStore_SeekToDesc_End(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKeyCursor_TombstoneRange(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
fs := tsm1.NewFileStore(dir)
|
||||
|
||||
// Setup 3 files
|
||||
data := []keyValues{
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating files: %v", err)
|
||||
}
|
||||
|
||||
fs.Add(files...)
|
||||
|
||||
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
|
||||
t.Fatalf("unexpected error delete range: %v", err)
|
||||
}
|
||||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
expValues := []int{0, 2}
|
||||
for _, v := range expValues {
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
||||
exp := data[v]
|
||||
if got, exp := len(values), 1; got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := values[0].String(), exp.values[0].String(); got != exp {
|
||||
t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp)
|
||||
}
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
fs := tsm1.NewFileStore(dir)
|
||||
|
||||
// Setup 3 files
|
||||
data := []keyValues{
|
||||
keyValues{"cpu", []tsm1.Value{
|
||||
tsm1.NewValue(0, 1.0),
|
||||
tsm1.NewValue(1, 2.0),
|
||||
tsm1.NewValue(2, 3.0)}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating files: %v", err)
|
||||
}
|
||||
|
||||
fs.Add(files...)
|
||||
|
||||
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
|
||||
t.Fatalf("unexpected error delete range: %v", err)
|
||||
}
|
||||
|
||||
buf := make([]tsm1.FloatValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadFloatBlock(&tsm1.TimeDecoder{}, &tsm1.FloatDecoder{}, &buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{data[0].values[0], data[0].values[2]}
|
||||
for i, v := range expValues {
|
||||
exp := v
|
||||
if got, exp := len(values), 2; got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := values[i].String(), exp.String(); got != exp {
|
||||
t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
fs := tsm1.NewFileStore(dir)
|
||||
|
||||
// Setup 3 files
|
||||
data := []keyValues{
|
||||
keyValues{"cpu", []tsm1.Value{
|
||||
tsm1.NewValue(0, int64(1)),
|
||||
tsm1.NewValue(1, int64(2)),
|
||||
tsm1.NewValue(2, int64(3))}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating files: %v", err)
|
||||
}
|
||||
|
||||
fs.Add(files...)
|
||||
|
||||
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
|
||||
t.Fatalf("unexpected error delete range: %v", err)
|
||||
}
|
||||
|
||||
buf := make([]tsm1.IntegerValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadIntegerBlock(&tsm1.TimeDecoder{}, &tsm1.IntegerDecoder{}, &buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{data[0].values[0], data[0].values[2]}
|
||||
for i, v := range expValues {
|
||||
exp := v
|
||||
if got, exp := len(values), 2; got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := values[i].String(), exp.String(); got != exp {
|
||||
t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
fs := tsm1.NewFileStore(dir)
|
||||
|
||||
// Setup 3 files
|
||||
data := []keyValues{
|
||||
keyValues{"cpu", []tsm1.Value{
|
||||
tsm1.NewValue(0, "1"),
|
||||
tsm1.NewValue(1, "2"),
|
||||
tsm1.NewValue(2, "3")}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating files: %v", err)
|
||||
}
|
||||
|
||||
fs.Add(files...)
|
||||
|
||||
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
|
||||
t.Fatalf("unexpected error delete range: %v", err)
|
||||
}
|
||||
|
||||
buf := make([]tsm1.StringValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadStringBlock(&tsm1.TimeDecoder{}, &tsm1.StringDecoder{}, &buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{data[0].values[0], data[0].values[2]}
|
||||
for i, v := range expValues {
|
||||
exp := v
|
||||
if got, exp := len(values), 2; got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := values[i].String(), exp.String(); got != exp {
|
||||
t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeyCursor_TombstoneRange_PartialBoolean(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
fs := tsm1.NewFileStore(dir)
|
||||
|
||||
// Setup 3 files
|
||||
data := []keyValues{
|
||||
keyValues{"cpu", []tsm1.Value{
|
||||
tsm1.NewValue(0, true),
|
||||
tsm1.NewValue(1, false),
|
||||
tsm1.NewValue(2, true)}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating files: %v", err)
|
||||
}
|
||||
|
||||
fs.Add(files...)
|
||||
|
||||
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
|
||||
t.Fatalf("unexpected error delete range: %v", err)
|
||||
}
|
||||
|
||||
buf := make([]tsm1.BooleanValue, 1000)
|
||||
c := fs.KeyCursor("cpu", 0, true)
|
||||
values, err := c.ReadBooleanBlock(&tsm1.TimeDecoder{}, &tsm1.BooleanDecoder{}, &buf)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading values: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{data[0].values[0], data[0].values[2]}
|
||||
for i, v := range expValues {
|
||||
exp := v
|
||||
if got, exp := len(values), 2; got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := values[i].String(), exp.String(); got != exp {
|
||||
t.Fatalf("read value mismatch(%d): got %v, exp %d", 0, got, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStore_Open(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
|
|
@ -37,6 +37,9 @@ type TSMIndex interface {
|
|||
// Delete removes the given keys from the index.
|
||||
Delete(keys []string)
|
||||
|
||||
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
|
||||
DeleteRange(keys []string, minTime, maxTime int64)
|
||||
|
||||
// Contains return true if the given key exists in the index.
|
||||
Contains(key string) bool
|
||||
|
||||
|
@ -73,6 +76,9 @@ type TSMIndex interface {
|
|||
// TimeRange returns the min and max time across all keys in the file.
|
||||
TimeRange() (int64, int64)
|
||||
|
||||
// TombstoneRange returns ranges of time that are deleted for the given key.
|
||||
TombstoneRange(key string) []TimeRange
|
||||
|
||||
// KeyRange returns the min and max keys in the file.
|
||||
KeyRange() (string, string)
|
||||
|
||||
|
@ -147,7 +153,7 @@ func (b *BlockIterator) Read() (string, int64, int64, uint32, []byte, error) {
|
|||
// blockAccessor abstracts a method of accessing blocks from a
|
||||
// TSM file.
|
||||
type blockAccessor interface {
|
||||
init() (TSMIndex, error)
|
||||
init() (*indirectIndex, error)
|
||||
read(key string, timestamp int64) ([]Value, error)
|
||||
readAll(key string) ([]Value, error)
|
||||
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
|
@ -320,6 +326,16 @@ func (t *TSMReader) ContainsValue(key string, ts int64) bool {
|
|||
return t.index.ContainsValue(key, ts)
|
||||
}
|
||||
|
||||
// DeleteRange removes the given points for keys between minTime and maxTime
|
||||
func (t *TSMReader) DeleteRange(keys []string, minTime, maxTime int64) error {
|
||||
if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.index.DeleteRange(keys, minTime, maxTime)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TSMReader) Delete(keys []string) error {
|
||||
if err := t.tombstoner.Add(keys); err != nil {
|
||||
return err
|
||||
|
@ -381,6 +397,13 @@ func (t *TSMReader) TombstoneFiles() []FileStat {
|
|||
return t.tombstoner.TombstoneFiles()
|
||||
}
|
||||
|
||||
// TombstoneRange returns ranges of time that are deleted for the given key.
|
||||
func (t *TSMReader) TombstoneRange(key string) []TimeRange {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.index.TombstoneRange(key)
|
||||
}
|
||||
|
||||
func (t *TSMReader) Stats() FileStat {
|
||||
minTime, maxTime := t.index.TimeRange()
|
||||
minKey, maxKey := t.index.KeyRange()
|
||||
|
@ -455,19 +478,21 @@ type indirectIndex struct {
|
|||
// minTime, maxTime are the minimum and maximum times contained in the file across all
|
||||
// series.
|
||||
minTime, maxTime int64
|
||||
|
||||
// tombstones contains only the tombstoned keys with subset of time values deleted. An
|
||||
// entry would exist here if a subset of the points for a key were deleted and the file
|
||||
// had not be re-compacted to remove the points on disk.
|
||||
tombstones map[string][]TimeRange
|
||||
}
|
||||
|
||||
func NewIndirectIndex() TSMIndex {
|
||||
return &indirectIndex{}
|
||||
type TimeRange struct {
|
||||
Min, Max int64
|
||||
}
|
||||
|
||||
// Add records a new block entry for a key in the index.
|
||||
func (d *indirectIndex) Add(key string, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
|
||||
panic("unsupported operation")
|
||||
}
|
||||
|
||||
func (d *indirectIndex) Write(w io.Writer) error {
|
||||
panic("unsupported operation")
|
||||
func NewIndirectIndex() *indirectIndex {
|
||||
return &indirectIndex{
|
||||
tombstones: make(map[string][]TimeRange),
|
||||
}
|
||||
}
|
||||
|
||||
// search returns the index of i in offsets for where key is located. If key is not
|
||||
|
@ -629,6 +654,34 @@ func (d *indirectIndex) Delete(keys []string) {
|
|||
d.offsets = offsets
|
||||
}
|
||||
|
||||
func (d *indirectIndex) DeleteRange(keys []string, minTime, maxTime int64) {
|
||||
// No keys, nothing to do
|
||||
if len(keys) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// If we're deleting the max time range, just use tombstoning to remove the
|
||||
// key from the offsets slice
|
||||
if minTime == math.MinInt64 && maxTime == math.MaxInt64 {
|
||||
d.Delete(keys)
|
||||
return
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
for _, k := range keys {
|
||||
d.tombstones[k] = append(d.tombstones[k], TimeRange{minTime, maxTime})
|
||||
}
|
||||
}
|
||||
|
||||
func (d *indirectIndex) TombstoneRange(key string) []TimeRange {
|
||||
d.mu.RLock()
|
||||
r := d.tombstones[key]
|
||||
d.mu.RUnlock()
|
||||
return r
|
||||
}
|
||||
|
||||
func (d *indirectIndex) Contains(key string) bool {
|
||||
return len(d.Entries(key)) > 0
|
||||
}
|
||||
|
@ -751,10 +804,10 @@ type mmapAccessor struct {
|
|||
|
||||
f *os.File
|
||||
b []byte
|
||||
index TSMIndex
|
||||
index *indirectIndex
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) init() (TSMIndex, error) {
|
||||
func (m *mmapAccessor) init() (*indirectIndex, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
@ -822,6 +875,7 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec
|
|||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
//TODO: Validate checksum
|
||||
a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
|
||||
if err != nil {
|
||||
|
@ -898,6 +952,8 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
tombstones := m.index.TombstoneRange(key)
|
||||
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
|
@ -905,6 +961,18 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) {
|
|||
var err error
|
||||
var values []Value
|
||||
for _, block := range blocks {
|
||||
var skip bool
|
||||
for _, t := range tombstones {
|
||||
// Should we skip this block because it contains points that have been deleted
|
||||
if t.Min <= block.MinTime && t.Max >= block.MaxTime {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
//TODO: Validate checksum
|
||||
temp = temp[:0]
|
||||
// The +4 is the 4 byte checksum length
|
||||
|
@ -912,6 +980,12 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter out any values that were deleted
|
||||
for _, t := range tombstones {
|
||||
temp = Values(temp).Filter(t.Min, t.Max)
|
||||
}
|
||||
|
||||
values = append(values, temp...)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package tsm1_test
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
|
@ -336,6 +337,175 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTSMReader_MMAP_TombstoneRange(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
f := MustTempFile(dir)
|
||||
defer f.Close()
|
||||
|
||||
w, err := tsm1.NewTSMWriter(f)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating writer: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{
|
||||
tsm1.NewValue(1, 1.0),
|
||||
tsm1.NewValue(2, 2.0),
|
||||
tsm1.NewValue(3, 3.0),
|
||||
}
|
||||
if err := w.Write("cpu", expValues); err != nil {
|
||||
t.Fatalf("unexpected error writing: %v", err)
|
||||
}
|
||||
|
||||
if err := w.WriteIndex(); err != nil {
|
||||
t.Fatalf("unexpected error writing index: %v", err)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
t.Fatalf("unexpected error closing: %v", err)
|
||||
}
|
||||
|
||||
f, err = os.Open(f.Name())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error open file: %v", err)
|
||||
}
|
||||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error created reader: %v", err)
|
||||
}
|
||||
|
||||
if err := r.DeleteRange([]string{"cpu"}, 2, math.MaxInt64); err != nil {
|
||||
t.Fatalf("unexpected error deleting: %v", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
values, err := r.ReadAll("cpu")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading all: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(values), 1; got != exp {
|
||||
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := values[0].String(), expValues[0].String(); got != exp {
|
||||
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTSMReader_MMAP_TombstoneFullRange(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
f := MustTempFile(dir)
|
||||
defer f.Close()
|
||||
|
||||
w, err := tsm1.NewTSMWriter(f)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating writer: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{
|
||||
tsm1.NewValue(1, 1.0),
|
||||
tsm1.NewValue(2, 2.0),
|
||||
tsm1.NewValue(3, 3.0),
|
||||
}
|
||||
if err := w.Write("cpu", expValues); err != nil {
|
||||
t.Fatalf("unexpected error writing: %v", err)
|
||||
}
|
||||
|
||||
if err := w.WriteIndex(); err != nil {
|
||||
t.Fatalf("unexpected error writing index: %v", err)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
t.Fatalf("unexpected error closing: %v", err)
|
||||
}
|
||||
|
||||
f, err = os.Open(f.Name())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error open file: %v", err)
|
||||
}
|
||||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error created reader: %v", err)
|
||||
}
|
||||
|
||||
if err := r.DeleteRange([]string{"cpu"}, math.MinInt64, math.MaxInt64); err != nil {
|
||||
t.Fatalf("unexpected error deleting: %v", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
values, err := r.ReadAll("cpu")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading all: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(values), 0; got != exp {
|
||||
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTSMReader_MMAP_TombstoneMultipleRanges(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
f := MustTempFile(dir)
|
||||
defer f.Close()
|
||||
|
||||
w, err := tsm1.NewTSMWriter(f)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating writer: %v", err)
|
||||
}
|
||||
|
||||
expValues := []tsm1.Value{
|
||||
tsm1.NewValue(1, 1.0),
|
||||
tsm1.NewValue(2, 2.0),
|
||||
tsm1.NewValue(3, 3.0),
|
||||
tsm1.NewValue(4, 4.0),
|
||||
tsm1.NewValue(5, 5.0),
|
||||
}
|
||||
if err := w.Write("cpu", expValues); err != nil {
|
||||
t.Fatalf("unexpected error writing: %v", err)
|
||||
}
|
||||
|
||||
if err := w.WriteIndex(); err != nil {
|
||||
t.Fatalf("unexpected error writing index: %v", err)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
t.Fatalf("unexpected error closing: %v", err)
|
||||
}
|
||||
|
||||
f, err = os.Open(f.Name())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error open file: %v", err)
|
||||
}
|
||||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error created reader: %v", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if err := r.DeleteRange([]string{"cpu"}, 2, 2); err != nil {
|
||||
t.Fatalf("unexpected error deleting: %v", err)
|
||||
}
|
||||
|
||||
if err := r.DeleteRange([]string{"cpu"}, 4, 4); err != nil {
|
||||
t.Fatalf("unexpected error deleting: %v", err)
|
||||
}
|
||||
|
||||
values, err := r.ReadAll("cpu")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading all: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(values), 3; got != exp {
|
||||
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTSMReader_MMAP_Stats(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
|
|
@ -29,7 +29,13 @@ type Tombstone struct {
|
|||
Min, Max int64
|
||||
}
|
||||
|
||||
// Add add the all keys to the tombstone
|
||||
func (t *Tombstoner) Add(keys []string) error {
|
||||
return t.AddRange(keys, math.MinInt64, math.MaxInt64)
|
||||
}
|
||||
|
||||
// AddRange adds all keys to the tombstone specifying only the data between min and max to be removed.
|
||||
func (t *Tombstoner) AddRange(keys []string, min, max int64) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
|
@ -47,8 +53,8 @@ func (t *Tombstoner) Add(keys []string) error {
|
|||
for _, k := range keys {
|
||||
tombstones = append(tombstones, Tombstone{
|
||||
Key: k,
|
||||
Min: math.MinInt64,
|
||||
Max: math.MaxInt64,
|
||||
Min: min,
|
||||
Max: max,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue