diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 847b8af4c8..2745f53828 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -161,6 +161,12 @@ type TSMFile interface { // Next must be called before calling any of the accessors. TimeRangeIterator(key []byte, min, max int64) *TimeRangeIterator + // TimeRangeMaxTimeIterator returns an iterator over the keys, starting at the provided + // key. Calling the HasData and MaxTime accessors will be restricted to the + // interval [min, max] for the current key. + // Next must be called before calling any of the accessors. + TimeRangeMaxTimeIterator(key []byte, min, max int64) *TimeRangeMaxTimeIterator + // Free releases any resources held by the FileStore to free up system resources. Free() error diff --git a/tsdb/tsm1/reader.go b/tsdb/tsm1/reader.go index 26bc7708ea..e5e799e47a 100644 --- a/tsdb/tsm1/reader.go +++ b/tsdb/tsm1/reader.go @@ -488,11 +488,34 @@ func (t *TSMReader) TimeRangeIterator(key []byte, min, max int64) *TimeRangeIter t.mu.RUnlock() return &TimeRangeIterator{ - r: t, - iter: iter, - tr: TimeRange{ - Min: min, - Max: max, + timeRangeBlockReader: timeRangeBlockReader{ + r: t, + iter: iter, + tr: TimeRange{ + Min: min, + Max: max, + }, + }, + } +} + +// TimeRangeMaxTimeIterator returns an iterator over the keys, starting at the provided +// key. Calling the HasData and MaxTime accessors will be restricted to the +// interval [min, max] for the current key and MaxTime ≤ max. +// Next must be called before calling any of the accessors. +func (t *TSMReader) TimeRangeMaxTimeIterator(key []byte, min, max int64) *TimeRangeMaxTimeIterator { + t.mu.RLock() + iter := t.index.Iterator(key) + t.mu.RUnlock() + + return &TimeRangeMaxTimeIterator{ + timeRangeBlockReader: timeRangeBlockReader{ + r: t, + iter: iter, + tr: TimeRange{ + Min: min, + Max: max, + }, }, } } diff --git a/tsdb/tsm1/reader_range_iterator.go b/tsdb/tsm1/reader_range_iterator.go index 577be69822..e708a97e61 100644 --- a/tsdb/tsm1/reader_range_iterator.go +++ b/tsdb/tsm1/reader_range_iterator.go @@ -8,23 +8,7 @@ import ( // the provided key. It is used to determine if each key has data which exists // within a specified time interval. type TimeRangeIterator struct { - r *TSMReader - iter *TSMIndexIterator - tr TimeRange - err error - stats cursors.CursorStats - - // temporary storage - trbuf []TimeRange - buf []byte - a cursors.TimestampArray -} - -func (b *TimeRangeIterator) Err() error { - if b.err != nil { - return b.err - } - return b.iter.Err() + timeRangeBlockReader } // Next advances the iterator and reports if it is still valid. @@ -47,67 +31,98 @@ func (b *TimeRangeIterator) Seek(key []byte) (exact, ok bool) { return b.iter.Seek(key) } -// Key reports the current key. -func (b *TimeRangeIterator) Key() []byte { - return b.iter.Key() -} - // HasData reports true if the current key has data for the time range. func (b *TimeRangeIterator) HasData() bool { if b.Err() != nil { return false } - e := excludeEntries(b.iter.Entries(), b.tr) + e, ts := b.getEntriesAndTombstones() if len(e) == 0 { return false } - b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0]) - var ts []TimeRange - if len(b.trbuf) > 0 { - ts = excludeTimeRanges(b.trbuf, b.tr) - } - if len(ts) == 0 { // no tombstones, fast path will avoid decoding blocks // if queried time interval intersects with one of the entries if intersectsEntry(e, b.tr) { return true } + } - for i := range e { - if !b.readBlock(&e[i]) { - return false - } - - if b.a.Contains(b.tr.Min, b.tr.Max) { - return true - } + for i := range e { + if !b.readBlock(&e[i]) { + return false } - } else { - for i := range e { - if !b.readBlock(&e[i]) { - return false - } - // remove tombstoned timestamps - for i := range ts { - b.a.Exclude(ts[i].Min, ts[i].Max) - } + // remove tombstoned timestamps + for i := range ts { + b.a.Exclude(ts[i].Min, ts[i].Max) + } - if b.a.Contains(b.tr.Min, b.tr.Max) { - return true - } + if b.a.Contains(b.tr.Min, b.tr.Max) { + return true } } return false } +// The timeRangeBlockReader provides common behavior +// for enumerating keys over a given time range and +// accumulating statistics. +type timeRangeBlockReader struct { + r *TSMReader + iter *TSMIndexIterator + tr TimeRange + err error + stats cursors.CursorStats + + // temporary storage + trbuf []TimeRange + buf []byte + a cursors.TimestampArray +} + +func (b *timeRangeBlockReader) Err() error { + if b.err != nil { + return b.err + } + return b.iter.Err() +} + +// Key reports the current key. +func (b *timeRangeBlockReader) Key() []byte { + return b.iter.Key() +} + +// Type reports the current block type. +func (b *timeRangeBlockReader) Type() byte { + return b.iter.Type() +} + +func (b *timeRangeBlockReader) getEntriesAndTombstones() ([]IndexEntry, []TimeRange) { + if b.err != nil { + return nil, nil + } + + e := excludeEntries(b.iter.Entries(), b.tr) + if len(e) == 0 { + return nil, nil + } + + b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0]) + var ts []TimeRange + if len(b.trbuf) > 0 { + ts = excludeTimeRanges(b.trbuf, b.tr) + } + + return e, ts +} + // readBlock reads the block identified by IndexEntry e and accumulates // statistics. readBlock returns true on success. -func (b *TimeRangeIterator) readBlock(e *IndexEntry) bool { +func (b *timeRangeBlockReader) readBlock(e *IndexEntry) bool { _, b.buf, b.err = b.r.ReadBytes(e, b.buf) if b.err != nil { return false @@ -124,7 +139,7 @@ func (b *TimeRangeIterator) readBlock(e *IndexEntry) bool { } // Stats returns statistics accumulated by the iterator for any block reads. -func (b *TimeRangeIterator) Stats() cursors.CursorStats { +func (b *timeRangeBlockReader) Stats() cursors.CursorStats { return b.stats } diff --git a/tsdb/tsm1/reader_range_iterator_test.go b/tsdb/tsm1/reader_range_iterator_test.go index cf515182a7..d5aebdf6e7 100644 --- a/tsdb/tsm1/reader_range_iterator_test.go +++ b/tsdb/tsm1/reader_range_iterator_test.go @@ -358,6 +358,14 @@ func TestExcludeEntries(t *testing.T) { }, exp: entries(12, 15, 19, 21), }, + { + args: args{ + e: entries(0, 10, 12, 15, 19, 21), + min: 13, + max: 20, + }, + exp: entries(12, 15, 19, 21), + }, { args: args{ e: entries(0, 10, 12, 15, 19, 21), diff --git a/tsdb/tsm1/reader_range_maxtime_iterator.go b/tsdb/tsm1/reader_range_maxtime_iterator.go new file mode 100644 index 0000000000..178c6156b3 --- /dev/null +++ b/tsdb/tsm1/reader_range_maxtime_iterator.go @@ -0,0 +1,141 @@ +package tsm1 + +import ( + "github.com/influxdata/influxdb/v2/models" +) + +const ( + // InvalidMinNanoTime is an invalid nano timestamp that has an ordinal + // value lower than models.MinNanoTime, the minimum valid timestamp + // that can be represented. + InvalidMinNanoTime = models.MinNanoTime - 1 +) + +// TimeRangeMaxTimeIterator will iterate over the keys of a TSM file, starting at +// the provided key. It is used to determine if each key has data which exists +// within a specified time interval. +type TimeRangeMaxTimeIterator struct { + timeRangeBlockReader + + // cached values + maxTime int64 + hasData bool + isLoaded bool +} + +// Next advances the iterator and reports if it is still valid. +func (b *TimeRangeMaxTimeIterator) Next() bool { + if b.Err() != nil { + return false + } + + b.clearIsLoaded() + + return b.iter.Next() +} + +// Seek points the iterator at the smallest key greater than or equal to the +// given key, returning true if it was an exact match. It returns false for +// ok if the key does not exist. +func (b *TimeRangeMaxTimeIterator) Seek(key []byte) (exact, ok bool) { + if b.Err() != nil { + return false, false + } + + b.clearIsLoaded() + + return b.iter.Seek(key) +} + +// HasData reports true if the current key has data for the time range. +func (b *TimeRangeMaxTimeIterator) HasData() bool { + if b.Err() != nil { + return false + } + + b.load() + + return b.hasData +} + +// MaxTime returns the maximum timestamp for the current key within the +// requested time range. If an error occurred or there is no data, +// InvalidMinTimeStamp will be returned, which is less than models.MinTimeStamp. +// This property can be leveraged when enumerating keys to find the maximum timestamp, +// as this value will always be lower than any valid timestamp returned. +// +// NOTE: If MaxTime is equal to the upper bounds of the queried time range, it +// means data was found equal to or beyond the requested time range and +// does not mean that data exists at that specific timestamp. +func (b *TimeRangeMaxTimeIterator) MaxTime() int64 { + if b.Err() != nil { + return InvalidMinNanoTime + } + + b.load() + + return b.maxTime +} + +func (b *TimeRangeMaxTimeIterator) clearIsLoaded() { b.isLoaded = false } + +// setMaxTime sets maxTime = min(b.tr.Max, max) and +// returns true if maxTime == b.tr.Max, indicating +// the iterator has reached the upper bound. +func (b *TimeRangeMaxTimeIterator) setMaxTime(max int64) bool { + if max > b.tr.Max { + b.maxTime = b.tr.Max + return true + } + b.maxTime = max + return false +} + +func (b *TimeRangeMaxTimeIterator) load() { + if b.isLoaded { + return + } + + b.isLoaded = true + b.hasData = false + b.maxTime = InvalidMinNanoTime + + e, ts := b.getEntriesAndTombstones() + if len(e) == 0 { + return + } + + if len(ts) == 0 { + // no tombstones, fast path will avoid decoding blocks + // if queried time interval intersects with one of the entries + if intersectsEntry(e, b.tr) { + b.hasData = true + b.setMaxTime(e[len(e)-1].MaxTime) + return + } + } + + for i := range e { + if !b.readBlock(&e[i]) { + goto ERROR + } + + // remove tombstoned timestamps + for i := range ts { + b.a.Exclude(ts[i].Min, ts[i].Max) + } + + if b.a.Contains(b.tr.Min, b.tr.Max) { + b.hasData = true + if b.setMaxTime(b.a.MaxTime()) { + return + } + } + } + + return +ERROR: + // ERROR ensures cached state is set to invalid values + b.hasData = false + b.maxTime = InvalidMinNanoTime +} diff --git a/tsdb/tsm1/reader_range_maxtime_iterator_test.go b/tsdb/tsm1/reader_range_maxtime_iterator_test.go new file mode 100644 index 0000000000..7d6e9c73f5 --- /dev/null +++ b/tsdb/tsm1/reader_range_maxtime_iterator_test.go @@ -0,0 +1,313 @@ +package tsm1 + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/tsdb" + "github.com/influxdata/influxdb/v2/tsdb/cursors" +) + +func TestTimeRangeMaxTimeIterator(t *testing.T) { + tsm := mustWriteTSM( + bucket{ + org: 0x50, + bucket: 0x60, + w: writes( + mw("cpu", + kw("tag0=val0", + vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)), + vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)), + ), + kw("tag0=val1", + vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)), + vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)), + ), + ), + ), + }, + + bucket{ + org: 0x51, + bucket: 0x61, + w: writes( + mw("mem", + kw("tag0=val0", + vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)), + vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)), + ), + kw("tag0=val1", + vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)), + vals(tvi(2000, 1)), + ), + kw("tag0=val2", + vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)), + vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)), + ), + ), + ), + }, + ) + defer tsm.RemoveAll() + + orgBucket := func(org, bucket uint) []byte { + n := tsdb.EncodeName(influxdb.ID(org), influxdb.ID(bucket)) + return n[:] + } + + type args struct { + min int64 + max int64 + } + + type res struct { + k string + hasData bool + maxTime int64 + } + + EXP := func(r ...interface{}) (rr []res) { + for i := 0; i+2 < len(r); i += 3 { + rr = append(rr, res{k: r[i].(string), hasData: r[i+1].(bool), maxTime: int64(r[i+2].(int))}) + } + return + } + + type test struct { + name string + args args + exp []res + expStats cursors.CursorStats + } + + type bucketTest struct { + org, bucket uint + m string + tests []test + } + + r := tsm.TSMReader() + + runTests := func(name string, tests []bucketTest) { + t.Run(name, func(t *testing.T) { + for _, bt := range tests { + key := orgBucket(bt.org, bt.bucket) + t.Run(fmt.Sprintf("0x%x-0x%x", bt.org, bt.bucket), func(t *testing.T) { + for _, tt := range bt.tests { + t.Run(tt.name, func(t *testing.T) { + iter := r.TimeRangeMaxTimeIterator(key, tt.args.min, tt.args.max) + count := 0 + for i, exp := range tt.exp { + if !iter.Next() { + t.Errorf("Next(%d): expected true", i) + } + + expKey := makeKey(influxdb.ID(bt.org), influxdb.ID(bt.bucket), bt.m, exp.k) + if got := iter.Key(); !cmp.Equal(got, expKey) { + t.Errorf("Key(%d): -got/+exp\n%v", i, cmp.Diff(got, expKey)) + } + + if got := iter.HasData(); got != exp.hasData { + t.Errorf("HasData(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.hasData)) + } + + if got := iter.MaxTime(); got != exp.maxTime { + t.Errorf("MaxTime(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.maxTime)) + } + count++ + } + if count != len(tt.exp) { + t.Errorf("count: -got/+exp\n%v", cmp.Diff(count, len(tt.exp))) + } + + if got := iter.Stats(); !cmp.Equal(got, tt.expStats) { + t.Errorf("Stats: -got/+exp\n%v", cmp.Diff(got, tt.expStats)) + } + }) + + } + }) + } + }) + } + + runTests("before delete", []bucketTest{ + { + org: 0x50, + bucket: 0x60, + m: "cpu", + tests: []test{ + { + name: "cover file", + args: args{ + min: 900, + max: 10000, + }, + exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 3020), + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + { + name: "within block", + args: args{ + min: 2001, + max: 2011, + }, + exp: EXP("tag0=val0", true, 2011, "tag0=val1", true, 2011), + expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48}, + }, + { + name: "to_2999", + args: args{ + min: 0, + max: 2999, + }, + exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2020), + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + { + name: "intersects block", + args: args{ + min: 1500, + max: 2500, + }, + exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2020), + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + }, + }, + + { + org: 0x51, + bucket: 0x61, + m: "mem", + tests: []test{ + { + name: "cover file", + args: args{ + min: 900, + max: 10000, + }, + exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2000, "tag0=val2", true, 3020), + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + { + name: "within block", + args: args{ + min: 2001, + max: 2011, + }, + exp: EXP("tag0=val0", true, 2011, "tag0=val1", false, int(InvalidMinNanoTime), "tag0=val2", true, 2011), + expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48}, + }, + { + name: "1000_2999", + args: args{ + min: 1000, + max: 2500, + }, + exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2000, "tag0=val2", true, 2020), + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + }, + }, + }) + + tsm.MustDeletePrefix(orgBucket(0x50, 0x60), 0, 2999) + tsm.MustDelete(makeKey(0x51, 0x61, "mem", "tag0=val0")) + tsm.MustDeleteRange(2000, 2999, + makeKey(0x51, 0x61, "mem", "tag0=val1"), + makeKey(0x51, 0x61, "mem", "tag0=val2"), + ) + + runTests("after delete", []bucketTest{ + { + org: 0x50, + bucket: 0x60, + m: "cpu", + tests: []test{ + { + name: "cover file", + args: args{ + min: 900, + max: 10000, + }, + exp: EXP("tag0=val1", true, 3020), + expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48}, + }, + { + name: "within block", + args: args{ + min: 2001, + max: 2011, + }, + exp: nil, + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + { + name: "to_2999", + args: args{ + min: 0, + max: 2999, + }, + exp: EXP("tag0=val1", false, int(InvalidMinNanoTime)), + expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24}, + }, + { + name: "intersects block", + args: args{ + min: 1500, + max: 2500, + }, + exp: EXP("tag0=val1", false, int(InvalidMinNanoTime)), + expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24}, + }, + { + name: "beyond all tombstones", + args: args{ + min: 3000, + max: 4000, + }, + exp: EXP("tag0=val1", true, 3020), + expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0}, + }, + }, + }, + + { + org: 0x51, + bucket: 0x61, + m: "mem", + tests: []test{ + { + name: "cover file", + args: args{ + min: 900, + max: 10000, + }, + exp: EXP("tag0=val1", true, 1020, "tag0=val2", true, 3020), + expStats: cursors.CursorStats{ScannedValues: 10, ScannedBytes: 80}, + }, + { + name: "within block", + args: args{ + min: 2001, + max: 2011, + }, + exp: EXP("tag0=val1", false, int(InvalidMinNanoTime), "tag0=val2", false, int(InvalidMinNanoTime)), + expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24}, + }, + { + name: "1000_2500", + args: args{ + min: 1000, + max: 2500, + }, + exp: EXP("tag0=val1", true, 1020, "tag0=val2", false, int(InvalidMinNanoTime)), + expStats: cursors.CursorStats{ScannedValues: 7, ScannedBytes: 56}, + }, + }, + }, + }) +}