feat: TimeRangeIterator for checking if keys have data in a TSM file

The TimeRangeIterator permits linear or random index scans and
can answer whether the current key has data for the specified time
interval, considering any tombstones.

When there are no tombstones there are some opportunities for
optimization to skip decoding blocks. Specifically, if the
queried time interval overlaps any boundaries of the TSM index entries.
pull/13426/head
Stuart Carnie 2019-04-17 16:52:29 -07:00
parent 7544ea0a5b
commit 35e0094a28
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
5 changed files with 887 additions and 0 deletions

View File

@ -150,6 +150,12 @@ type TSMFile interface {
// allows sequential iteration to each and every block.
BlockIterator() *BlockIterator
// TimeRangeIterator returns an iterator over the keys, starting at the provided
// key. Calling the HasData accessor will return true if data exists for the
// interval [min, max] for the current key.
// Next must be called before calling any of the accessors.
TimeRangeIterator(key []byte, min, max int64) *TimeRangeIterator
// Free releases any resources held by the FileStore to free up system resources.
Free() error

View File

@ -456,6 +456,25 @@ func (t *TSMReader) BlockIterator() *BlockIterator {
}
}
// TimeRangeIterator returns an iterator over the keys, starting at the provided
// key. Calling the HasData accessor will return true if data exists for the
// interval [min, max] for the current key.
// Next must be called before calling any of the accessors.
func (t *TSMReader) TimeRangeIterator(key []byte, min, max int64) *TimeRangeIterator {
t.mu.RLock()
iter := t.index.Iterator(key)
t.mu.RUnlock()
return &TimeRangeIterator{
r: t,
iter: iter,
tr: TimeRange{
Min: min,
Max: max,
},
}
}
type BatchDeleter interface {
DeleteRange(keys [][]byte, min, max int64) error
Commit() error

View File

@ -0,0 +1,186 @@
package tsm1
import (
"github.com/influxdata/influxdb/tsdb"
)
// TimeRangeIterator will iterate over the keys of a TSM file, starting at
// the provided key. It is used to determine if each key has data which exists
// within a specified time interval.
type TimeRangeIterator struct {
r *TSMReader
iter *TSMIndexIterator
tr TimeRange
err error
// temporary storage
trbuf []TimeRange
buf []byte
a tsdb.TimestampArray
}
func (b *TimeRangeIterator) Err() error {
if b.err != nil {
return b.err
}
return b.iter.Err()
}
// Next advances the iterator and reports if it is still valid.
func (b *TimeRangeIterator) Next() bool {
if b.Err() != nil {
return false
}
return b.iter.Next()
}
// Seek points the iterator at the smallest key greater than or equal to the
// given key, returning true if it was an exact match. It returns false for
// ok if the key does not exist.
func (b *TimeRangeIterator) Seek(key []byte) (exact, ok bool) {
if b.Err() != nil {
return false, false
}
return b.iter.Seek(key)
}
// Key reports the current key.
func (b *TimeRangeIterator) Key() []byte {
return b.iter.Key()
}
// HasData reports true if the current key has data for the time range.
func (b *TimeRangeIterator) HasData() bool {
if b.Err() != nil {
return false
}
e := excludeEntries(b.iter.Entries(), b.tr)
if len(e) == 0 {
return false
}
b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0])
var ts []TimeRange
if len(b.trbuf) > 0 {
ts = excludeTimeRanges(b.trbuf, b.tr)
}
if len(ts) == 0 {
// no tombstones, fast path will avoid decoding blocks
// if queried time interval intersects with one of the entries
if intersectsEntry(e, b.tr) {
return true
}
for i := range e {
_, b.buf, b.err = b.r.ReadBytes(&e[i], b.buf)
if b.err != nil {
return false
}
b.err = DecodeTimestampArrayBlock(b.buf, &b.a)
if b.err != nil {
return false
}
if b.a.Contains(b.tr.Min, b.tr.Max) {
return true
}
}
} else {
for i := range e {
_, b.buf, b.err = b.r.ReadBytes(&e[i], b.buf)
if b.err != nil {
return false
}
b.err = DecodeTimestampArrayBlock(b.buf, &b.a)
if b.err != nil {
return false
}
// remove tombstoned timestamps
for i := range ts {
b.a.Exclude(ts[i].Min, ts[i].Max)
}
if b.a.Contains(b.tr.Min, b.tr.Max) {
return true
}
}
}
return false
}
/*
intersectsEntry determines whether the range [min, max]
intersects one or both boundaries of IndexEntry.
+------------------+
| IndexEntry |
+---------+------------------+---------+
| RANGE | | RANGE |
+-+-------+-+ +----+----+----+
| RANGE | | RANGE |
+----+----+-----------+---------+
| RANGE |
+--------------------------+
*/
// intersectsEntry determines if tr overlaps one or both boundaries
// of at least one element of e. If that is the case,
// and the block has no tombstones, the block timestamps do not
// need to be decoded.
func intersectsEntry(e []IndexEntry, tr TimeRange) bool {
for i := range e {
min, max := e[i].MinTime, e[i].MaxTime
if tr.Overlaps(min, max) && !tr.Within(min, max) {
return true
}
}
return false
}
// excludeEntries returns a slice which excludes leading and trailing
// elements of e that are outside the time range specified by tr.
func excludeEntries(e []IndexEntry, tr TimeRange) []IndexEntry {
for i := range e {
if e[i].OverlapsTimeRange(tr.Min, tr.Max) {
e = e[i:]
break
}
}
for i := range e {
if !e[i].OverlapsTimeRange(tr.Min, tr.Max) {
e = e[:i]
break
}
}
return e
}
// excludeTimeRanges returns a slice which excludes leading and trailing
// elements of e that are outside the time range specified by tr.
func excludeTimeRanges(e []TimeRange, tr TimeRange) []TimeRange {
for i := range e {
if e[i].Overlaps(tr.Min, tr.Max) {
e = e[i:]
break
}
}
for i := range e {
if !e[i].Overlaps(tr.Min, tr.Max) {
e = e[:i]
break
}
}
return e
}

View File

@ -0,0 +1,670 @@
package tsm1
import (
"fmt"
"os"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
func TestTimeRangeIterator(t *testing.T) {
tsm := mustWriteTSM(
bucket{
org: 0x50,
bucket: 0x60,
w: writes(
mw("cpu",
kw("tag0=val0",
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
),
kw("tag0=val1",
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)),
),
),
),
},
bucket{
org: 0x51,
bucket: 0x61,
w: writes(
mw("mem",
kw("tag0=val0",
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
),
kw("tag0=val1",
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
vals(tvi(2000, 1)),
),
kw("tag0=val2",
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)),
),
),
),
},
)
defer tsm.RemoveAll()
orgBucket := func(org, bucket uint) []byte {
n := tsdb.EncodeName(influxdb.ID(org), influxdb.ID(bucket))
return n[:]
}
type args struct {
min int64
max int64
}
type res struct {
k string
hasData bool
}
EXP := func(r ...interface{}) (rr []res) {
for i := 0; i+1 < len(r); i += 2 {
rr = append(rr, res{k: r[i].(string), hasData: r[i+1].(bool)})
}
return
}
type test struct {
name string
args args
exp []res
hasData []bool
}
type bucketTest struct {
org, bucket uint
m string
tests []test
}
r := tsm.TSMReader()
runTests := func(name string, tests []bucketTest) {
t.Run(name, func(t *testing.T) {
for _, bt := range tests {
key := orgBucket(bt.org, bt.bucket)
t.Run(fmt.Sprintf("0x%x-0x%x", bt.org, bt.bucket), func(t *testing.T) {
for _, tt := range bt.tests {
t.Run(tt.name, func(t *testing.T) {
iter := r.TimeRangeIterator(key, tt.args.min, tt.args.max)
count := 0
for i, exp := range tt.exp {
if !iter.Next() {
t.Errorf("Next(%d): expected true", i)
}
expKey := makeKey(influxdb.ID(bt.org), influxdb.ID(bt.bucket), bt.m, exp.k)
if got := iter.Key(); !cmp.Equal(got, expKey) {
t.Errorf("Key(%d): -got/+exp\n%v", i, cmp.Diff(got, expKey))
}
if got := iter.HasData(); got != exp.hasData {
t.Errorf("HasData(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.hasData))
}
count++
}
if count != len(tt.exp) {
t.Errorf("count: -got/+exp\n%v", cmp.Diff(count, len(tt.exp)))
}
})
}
})
}
})
}
runTests("before delete", []bucketTest{
{
org: 0x50,
bucket: 0x60,
m: "cpu",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val0", true, "tag0=val1", true),
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: EXP("tag0=val0", true, "tag0=val1", true),
},
{
name: "to_2999",
args: args{
min: 0,
max: 2999,
},
exp: EXP("tag0=val0", true, "tag0=val1", true),
},
{
name: "intersects block",
args: args{
min: 1500,
max: 2500,
},
exp: EXP("tag0=val0", true, "tag0=val1", true),
},
},
},
{
org: 0x51,
bucket: 0x61,
m: "mem",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val0", true, "tag0=val1", true, "tag0=val2", true),
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: EXP("tag0=val0", true, "tag0=val1", false, "tag0=val2", true),
},
{
name: "1000_2999",
args: args{
min: 1000,
max: 2500,
},
exp: EXP("tag0=val0", true, "tag0=val1", true, "tag0=val2", true),
},
},
},
})
tsm.MustDeletePrefix(orgBucket(0x50, 0x60), 0, 2999)
tsm.MustDelete(makeKey(0x51, 0x61, "mem", "tag0=val0"))
tsm.MustDeleteRange(2000, 2999,
makeKey(0x51, 0x61, "mem", "tag0=val1"),
makeKey(0x51, 0x61, "mem", "tag0=val2"),
)
runTests("after delete", []bucketTest{
{
org: 0x50,
bucket: 0x60,
m: "cpu",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val1", true),
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: nil,
},
{
name: "to_2999",
args: args{
min: 0,
max: 2999,
},
exp: EXP("tag0=val1", false),
},
{
name: "intersects block",
args: args{
min: 1500,
max: 2500,
},
exp: EXP("tag0=val1", false),
},
{
name: "beyond all tombstones",
args: args{
min: 3000,
max: 4000,
},
exp: EXP("tag0=val1", true),
},
},
},
{
org: 0x51,
bucket: 0x61,
m: "mem",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val1", true, "tag0=val2", true),
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: EXP("tag0=val1", false, "tag0=val2", false),
},
{
name: "1000_2500",
args: args{
min: 1000,
max: 2500,
},
exp: EXP("tag0=val1", true, "tag0=val2", false),
},
},
},
})
}
func TestExcludeEntries(t *testing.T) {
entries := func(ts ...int64) (e []IndexEntry) {
for i := 0; i+1 < len(ts); i += 2 {
e = append(e, IndexEntry{MinTime: ts[i], MaxTime: ts[i+1]})
}
return
}
eq := func(a, b []IndexEntry) bool {
if len(a) == 0 && len(b) == 0 {
return true
}
return cmp.Equal(a, b)
}
type args struct {
e []IndexEntry
min int64
max int64
}
tests := []struct {
name string
args args
exp []IndexEntry
}{
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 11,
max: 13,
},
exp: entries(12, 15),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 10,
max: 13,
},
exp: entries(0, 10, 12, 15),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 12,
max: 30,
},
exp: entries(12, 15, 19, 21),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 0,
max: 100,
},
exp: entries(0, 10, 12, 15, 19, 21),
},
{
args: args{
e: entries(0, 10, 13, 15, 19, 21),
min: 11,
max: 12,
},
exp: entries(),
},
{
args: args{
e: entries(12, 15, 19, 21),
min: 0,
max: 9,
},
exp: entries(),
},
{
args: args{
e: entries(12, 15, 19, 21),
min: 22,
max: 30,
},
exp: entries(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := excludeEntries(tt.args.e, TimeRange{tt.args.min, tt.args.max}); !cmp.Equal(got, tt.exp, cmp.Comparer(eq)) {
t.Errorf("excludeEntries() -got/+exp\n%v", cmp.Diff(got, tt.exp))
}
})
}
}
func TestExcludeTimeRanges(t *testing.T) {
entries := func(ts ...int64) (e []TimeRange) {
for i := 0; i+1 < len(ts); i += 2 {
e = append(e, TimeRange{Min: ts[i], Max: ts[i+1]})
}
return
}
eq := func(a, b []TimeRange) bool {
if len(a) == 0 && len(b) == 0 {
return true
}
return cmp.Equal(a, b)
}
type args struct {
e []TimeRange
min int64
max int64
}
tests := []struct {
name string
args args
exp []TimeRange
}{
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 11,
max: 13,
},
exp: entries(12, 15),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 10,
max: 13,
},
exp: entries(0, 10, 12, 15),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 12,
max: 30,
},
exp: entries(12, 15, 19, 21),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 0,
max: 100,
},
exp: entries(0, 10, 12, 15, 19, 21),
},
{
args: args{
e: entries(0, 10, 13, 15, 19, 21),
min: 11,
max: 12,
},
exp: entries(),
},
{
args: args{
e: entries(12, 15, 19, 21),
min: 0,
max: 9,
},
exp: entries(),
},
{
args: args{
e: entries(12, 15, 19, 21),
min: 22,
max: 30,
},
exp: entries(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := excludeTimeRanges(tt.args.e, TimeRange{tt.args.min, tt.args.max}); !cmp.Equal(got, tt.exp, cmp.Comparer(eq)) {
t.Errorf("excludeEntries() -got/+exp\n%v", cmp.Diff(got, tt.exp))
}
})
}
}
func TestIntersectsEntries(t *testing.T) {
entries := func(ts ...int64) (e []IndexEntry) {
for i := 0; i+1 < len(ts); i += 2 {
e = append(e, IndexEntry{MinTime: ts[i], MaxTime: ts[i+1]})
}
return
}
type args struct {
e []IndexEntry
tr TimeRange
}
tests := []struct {
name string
args args
exp bool
}{
{
name: "",
args: args{
e: entries(5, 10, 13, 15, 19, 21, 22, 27),
tr: TimeRange{6, 9},
},
exp: false,
},
{
args: args{
e: entries(5, 10, 13, 15, 19, 21, 22, 27),
tr: TimeRange{11, 12},
},
exp: false,
},
{
args: args{
e: entries(5, 10, 13, 15, 19, 21, 22, 27),
tr: TimeRange{2, 4},
},
exp: false,
},
{
args: args{
e: entries(5, 10, 13, 15, 19, 21, 22, 27),
tr: TimeRange{28, 40},
},
exp: false,
},
{
args: args{
e: entries(5, 10, 13, 15, 19, 21, 22, 27),
tr: TimeRange{3, 11},
},
exp: true,
},
{
args: args{
e: entries(5, 10, 13, 15, 19, 21, 22, 27),
tr: TimeRange{5, 27},
},
exp: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := intersectsEntry(tt.args.e, tt.args.tr); got != tt.exp {
t.Errorf("excludeEntries() -got/+exp\n%v", cmp.Diff(got, tt.exp))
}
})
}
}
type bucket struct {
org, bucket influxdb.ID
w []measurementWrite
}
func writes(w ...measurementWrite) []measurementWrite {
return w
}
type measurementWrite struct {
m string
w []keyWrite
}
func mw(m string, w ...keyWrite) measurementWrite {
return measurementWrite{m, w}
}
type keyWrite struct {
k string
w []Values
}
func kw(k string, w ...Values) keyWrite { return keyWrite{k, w} }
func vals(tv ...Value) Values { return tv }
func tvi(ts int64, v int64) Value { return NewIntegerValue(ts, v) }
type tsmState struct {
dir string
file string
r *TSMReader
}
const fieldName = "v"
func makeKey(org, bucket influxdb.ID, m string, k string) []byte {
name := tsdb.EncodeName(org, bucket)
line := string(m) + "," + k
tags := make(models.Tags, 1)
tags[0] = models.NewTag(models.MeasurementTagKeyBytes, []byte(m))
tags = append(tags, models.ParseTags([]byte(line))...)
tags = append(tags, models.NewTag(models.FieldKeyTagKeyBytes, []byte(fieldName)))
return SeriesFieldKeyBytes(string(models.MakeKey(name[:], tags)), fieldName)
}
func mustWriteTSM(writes ...bucket) (s *tsmState) {
dir := mustTempDir()
defer func() {
if s == nil {
_ = os.RemoveAll(dir)
}
}()
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
panic(fmt.Sprintf("unexpected error creating writer: %v", err))
}
for _, ob := range writes {
for _, mw := range ob.w {
for _, kw := range mw.w {
key := makeKey(ob.org, ob.bucket, mw.m, kw.k)
for _, vw := range kw.w {
if err := w.Write(key, vw); err != nil {
panic(fmt.Sprintf("Write failed: %v", err))
}
}
}
}
}
if err := w.WriteIndex(); err != nil {
panic(fmt.Sprintf("WriteIndex: %v", err))
}
if err := w.Close(); err != nil {
panic(fmt.Sprintf("Close: %v", err))
}
fd, err := os.Open(f.Name())
if err != nil {
panic(fmt.Sprintf("os.Open: %v", err))
}
r, err := NewTSMReader(fd)
if err != nil {
panic(fmt.Sprintf("NewTSMReader: %v", err))
}
return &tsmState{
dir: dir,
file: f.Name(),
r: r,
}
}
func (s *tsmState) TSMReader() *TSMReader {
return s.r
}
func (s *tsmState) RemoveAll() {
_ = os.RemoveAll(s.dir)
}
func (s *tsmState) MustDeletePrefix(key []byte, min, max int64) {
err := s.r.DeletePrefix(key, min, max, nil)
if err != nil {
panic(fmt.Sprintf("DeletePrefix: %v", err))
}
}
func (s *tsmState) MustDelete(keys ...[]byte) {
err := s.r.Delete(keys)
if err != nil {
panic(fmt.Sprintf("Delete: %v", err))
}
}
func (s *tsmState) MustDeleteRange(min, max int64, keys ...[]byte) {
err := s.r.DeleteRange(keys, min, max)
if err != nil {
panic(fmt.Sprintf("DeleteRange: %v", err))
}
}

View File

@ -9,6 +9,12 @@ func (t TimeRange) Overlaps(min, max int64) bool {
return t.Min <= max && t.Max >= min
}
// Within returns true if min < t.Min and t.Max < max and therefore the interval [t.Min, t.Max] is
// contained within [min, max]
func (t TimeRange) Within(min, max int64) bool {
return min < t.Min && t.Max < max
}
func (t TimeRange) Less(o TimeRange) bool {
return t.Min < o.Min || (t.Min == o.Min && t.Max < o.Max)
}