refactor(tsm1): Add TimeRangeMaxTimeIterator

This commit introduces a new API for finding the maximum
timestamp of a series when iterating over the keys in a
set of TSM files.

This API will be used to determine the field type of a single
field key by selecting the series with the maximum timestamp.

It has also refactored the common functionality for iterating
TSM keys into `timeRangeBlockReader`, which is shared
between `TimeRangeIterator` and `TimeRangeMaxTimeIterator`.
pull/17589/head
Stuart Carnie 2020-04-08 16:05:19 -07:00
parent 77bb23fd38
commit 31df76e1e9
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
6 changed files with 562 additions and 56 deletions

View File

@ -161,6 +161,12 @@ type TSMFile interface {
// Next must be called before calling any of the accessors.
TimeRangeIterator(key []byte, min, max int64) *TimeRangeIterator
// TimeRangeMaxTimeIterator returns an iterator over the keys, starting at the provided
// key. Calling the HasData and MaxTime accessors will be restricted to the
// interval [min, max] for the current key.
// Next must be called before calling any of the accessors.
TimeRangeMaxTimeIterator(key []byte, min, max int64) *TimeRangeMaxTimeIterator
// Free releases any resources held by the FileStore to free up system resources.
Free() error

View File

@ -488,11 +488,34 @@ func (t *TSMReader) TimeRangeIterator(key []byte, min, max int64) *TimeRangeIter
t.mu.RUnlock()
return &TimeRangeIterator{
r: t,
iter: iter,
tr: TimeRange{
Min: min,
Max: max,
timeRangeBlockReader: timeRangeBlockReader{
r: t,
iter: iter,
tr: TimeRange{
Min: min,
Max: max,
},
},
}
}
// TimeRangeMaxTimeIterator returns an iterator over the keys, starting at the provided
// key. Calling the HasData and MaxTime accessors will be restricted to the
// interval [min, max] for the current key and MaxTime ≤ max.
// Next must be called before calling any of the accessors.
func (t *TSMReader) TimeRangeMaxTimeIterator(key []byte, min, max int64) *TimeRangeMaxTimeIterator {
t.mu.RLock()
iter := t.index.Iterator(key)
t.mu.RUnlock()
return &TimeRangeMaxTimeIterator{
timeRangeBlockReader: timeRangeBlockReader{
r: t,
iter: iter,
tr: TimeRange{
Min: min,
Max: max,
},
},
}
}

View File

@ -8,23 +8,7 @@ import (
// the provided key. It is used to determine if each key has data which exists
// within a specified time interval.
type TimeRangeIterator struct {
r *TSMReader
iter *TSMIndexIterator
tr TimeRange
err error
stats cursors.CursorStats
// temporary storage
trbuf []TimeRange
buf []byte
a cursors.TimestampArray
}
func (b *TimeRangeIterator) Err() error {
if b.err != nil {
return b.err
}
return b.iter.Err()
timeRangeBlockReader
}
// Next advances the iterator and reports if it is still valid.
@ -47,67 +31,98 @@ func (b *TimeRangeIterator) Seek(key []byte) (exact, ok bool) {
return b.iter.Seek(key)
}
// Key reports the current key.
func (b *TimeRangeIterator) Key() []byte {
return b.iter.Key()
}
// HasData reports true if the current key has data for the time range.
func (b *TimeRangeIterator) HasData() bool {
if b.Err() != nil {
return false
}
e := excludeEntries(b.iter.Entries(), b.tr)
e, ts := b.getEntriesAndTombstones()
if len(e) == 0 {
return false
}
b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0])
var ts []TimeRange
if len(b.trbuf) > 0 {
ts = excludeTimeRanges(b.trbuf, b.tr)
}
if len(ts) == 0 {
// no tombstones, fast path will avoid decoding blocks
// if queried time interval intersects with one of the entries
if intersectsEntry(e, b.tr) {
return true
}
}
for i := range e {
if !b.readBlock(&e[i]) {
return false
}
if b.a.Contains(b.tr.Min, b.tr.Max) {
return true
}
for i := range e {
if !b.readBlock(&e[i]) {
return false
}
} else {
for i := range e {
if !b.readBlock(&e[i]) {
return false
}
// remove tombstoned timestamps
for i := range ts {
b.a.Exclude(ts[i].Min, ts[i].Max)
}
// remove tombstoned timestamps
for i := range ts {
b.a.Exclude(ts[i].Min, ts[i].Max)
}
if b.a.Contains(b.tr.Min, b.tr.Max) {
return true
}
if b.a.Contains(b.tr.Min, b.tr.Max) {
return true
}
}
return false
}
// The timeRangeBlockReader provides common behavior
// for enumerating keys over a given time range and
// accumulating statistics.
type timeRangeBlockReader struct {
r *TSMReader
iter *TSMIndexIterator
tr TimeRange
err error
stats cursors.CursorStats
// temporary storage
trbuf []TimeRange
buf []byte
a cursors.TimestampArray
}
func (b *timeRangeBlockReader) Err() error {
if b.err != nil {
return b.err
}
return b.iter.Err()
}
// Key reports the current key.
func (b *timeRangeBlockReader) Key() []byte {
return b.iter.Key()
}
// Type reports the current block type.
func (b *timeRangeBlockReader) Type() byte {
return b.iter.Type()
}
func (b *timeRangeBlockReader) getEntriesAndTombstones() ([]IndexEntry, []TimeRange) {
if b.err != nil {
return nil, nil
}
e := excludeEntries(b.iter.Entries(), b.tr)
if len(e) == 0 {
return nil, nil
}
b.trbuf = b.r.TombstoneRange(b.iter.Key(), b.trbuf[:0])
var ts []TimeRange
if len(b.trbuf) > 0 {
ts = excludeTimeRanges(b.trbuf, b.tr)
}
return e, ts
}
// readBlock reads the block identified by IndexEntry e and accumulates
// statistics. readBlock returns true on success.
func (b *TimeRangeIterator) readBlock(e *IndexEntry) bool {
func (b *timeRangeBlockReader) readBlock(e *IndexEntry) bool {
_, b.buf, b.err = b.r.ReadBytes(e, b.buf)
if b.err != nil {
return false
@ -124,7 +139,7 @@ func (b *TimeRangeIterator) readBlock(e *IndexEntry) bool {
}
// Stats returns statistics accumulated by the iterator for any block reads.
func (b *TimeRangeIterator) Stats() cursors.CursorStats {
func (b *timeRangeBlockReader) Stats() cursors.CursorStats {
return b.stats
}

View File

@ -358,6 +358,14 @@ func TestExcludeEntries(t *testing.T) {
},
exp: entries(12, 15, 19, 21),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),
min: 13,
max: 20,
},
exp: entries(12, 15, 19, 21),
},
{
args: args{
e: entries(0, 10, 12, 15, 19, 21),

View File

@ -0,0 +1,141 @@
package tsm1
import (
"github.com/influxdata/influxdb/v2/models"
)
const (
// InvalidMinNanoTime is an invalid nano timestamp that has an ordinal
// value lower than models.MinNanoTime, the minimum valid timestamp
// that can be represented.
InvalidMinNanoTime = models.MinNanoTime - 1
)
// TimeRangeMaxTimeIterator will iterate over the keys of a TSM file, starting at
// the provided key. It is used to determine if each key has data which exists
// within a specified time interval.
type TimeRangeMaxTimeIterator struct {
timeRangeBlockReader
// cached values
maxTime int64
hasData bool
isLoaded bool
}
// Next advances the iterator and reports if it is still valid.
func (b *TimeRangeMaxTimeIterator) Next() bool {
if b.Err() != nil {
return false
}
b.clearIsLoaded()
return b.iter.Next()
}
// Seek points the iterator at the smallest key greater than or equal to the
// given key, returning true if it was an exact match. It returns false for
// ok if the key does not exist.
func (b *TimeRangeMaxTimeIterator) Seek(key []byte) (exact, ok bool) {
if b.Err() != nil {
return false, false
}
b.clearIsLoaded()
return b.iter.Seek(key)
}
// HasData reports true if the current key has data for the time range.
func (b *TimeRangeMaxTimeIterator) HasData() bool {
if b.Err() != nil {
return false
}
b.load()
return b.hasData
}
// MaxTime returns the maximum timestamp for the current key within the
// requested time range. If an error occurred or there is no data,
// InvalidMinTimeStamp will be returned, which is less than models.MinTimeStamp.
// This property can be leveraged when enumerating keys to find the maximum timestamp,
// as this value will always be lower than any valid timestamp returned.
//
// NOTE: If MaxTime is equal to the upper bounds of the queried time range, it
// means data was found equal to or beyond the requested time range and
// does not mean that data exists at that specific timestamp.
func (b *TimeRangeMaxTimeIterator) MaxTime() int64 {
if b.Err() != nil {
return InvalidMinNanoTime
}
b.load()
return b.maxTime
}
func (b *TimeRangeMaxTimeIterator) clearIsLoaded() { b.isLoaded = false }
// setMaxTime sets maxTime = min(b.tr.Max, max) and
// returns true if maxTime == b.tr.Max, indicating
// the iterator has reached the upper bound.
func (b *TimeRangeMaxTimeIterator) setMaxTime(max int64) bool {
if max > b.tr.Max {
b.maxTime = b.tr.Max
return true
}
b.maxTime = max
return false
}
func (b *TimeRangeMaxTimeIterator) load() {
if b.isLoaded {
return
}
b.isLoaded = true
b.hasData = false
b.maxTime = InvalidMinNanoTime
e, ts := b.getEntriesAndTombstones()
if len(e) == 0 {
return
}
if len(ts) == 0 {
// no tombstones, fast path will avoid decoding blocks
// if queried time interval intersects with one of the entries
if intersectsEntry(e, b.tr) {
b.hasData = true
b.setMaxTime(e[len(e)-1].MaxTime)
return
}
}
for i := range e {
if !b.readBlock(&e[i]) {
goto ERROR
}
// remove tombstoned timestamps
for i := range ts {
b.a.Exclude(ts[i].Min, ts[i].Max)
}
if b.a.Contains(b.tr.Min, b.tr.Max) {
b.hasData = true
if b.setMaxTime(b.a.MaxTime()) {
return
}
}
}
return
ERROR:
// ERROR ensures cached state is set to invalid values
b.hasData = false
b.maxTime = InvalidMinNanoTime
}

View File

@ -0,0 +1,313 @@
package tsm1
import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
func TestTimeRangeMaxTimeIterator(t *testing.T) {
tsm := mustWriteTSM(
bucket{
org: 0x50,
bucket: 0x60,
w: writes(
mw("cpu",
kw("tag0=val0",
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
),
kw("tag0=val1",
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)),
),
),
),
},
bucket{
org: 0x51,
bucket: 0x61,
w: writes(
mw("mem",
kw("tag0=val0",
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
),
kw("tag0=val1",
vals(tvi(1000, 1), tvi(1010, 2), tvi(1020, 3)),
vals(tvi(2000, 1)),
),
kw("tag0=val2",
vals(tvi(2000, 1), tvi(2010, 2), tvi(2020, 3)),
vals(tvi(3000, 1), tvi(3010, 2), tvi(3020, 3)),
),
),
),
},
)
defer tsm.RemoveAll()
orgBucket := func(org, bucket uint) []byte {
n := tsdb.EncodeName(influxdb.ID(org), influxdb.ID(bucket))
return n[:]
}
type args struct {
min int64
max int64
}
type res struct {
k string
hasData bool
maxTime int64
}
EXP := func(r ...interface{}) (rr []res) {
for i := 0; i+2 < len(r); i += 3 {
rr = append(rr, res{k: r[i].(string), hasData: r[i+1].(bool), maxTime: int64(r[i+2].(int))})
}
return
}
type test struct {
name string
args args
exp []res
expStats cursors.CursorStats
}
type bucketTest struct {
org, bucket uint
m string
tests []test
}
r := tsm.TSMReader()
runTests := func(name string, tests []bucketTest) {
t.Run(name, func(t *testing.T) {
for _, bt := range tests {
key := orgBucket(bt.org, bt.bucket)
t.Run(fmt.Sprintf("0x%x-0x%x", bt.org, bt.bucket), func(t *testing.T) {
for _, tt := range bt.tests {
t.Run(tt.name, func(t *testing.T) {
iter := r.TimeRangeMaxTimeIterator(key, tt.args.min, tt.args.max)
count := 0
for i, exp := range tt.exp {
if !iter.Next() {
t.Errorf("Next(%d): expected true", i)
}
expKey := makeKey(influxdb.ID(bt.org), influxdb.ID(bt.bucket), bt.m, exp.k)
if got := iter.Key(); !cmp.Equal(got, expKey) {
t.Errorf("Key(%d): -got/+exp\n%v", i, cmp.Diff(got, expKey))
}
if got := iter.HasData(); got != exp.hasData {
t.Errorf("HasData(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.hasData))
}
if got := iter.MaxTime(); got != exp.maxTime {
t.Errorf("MaxTime(%d): -got/+exp\n%v", i, cmp.Diff(got, exp.maxTime))
}
count++
}
if count != len(tt.exp) {
t.Errorf("count: -got/+exp\n%v", cmp.Diff(count, len(tt.exp)))
}
if got := iter.Stats(); !cmp.Equal(got, tt.expStats) {
t.Errorf("Stats: -got/+exp\n%v", cmp.Diff(got, tt.expStats))
}
})
}
})
}
})
}
runTests("before delete", []bucketTest{
{
org: 0x50,
bucket: 0x60,
m: "cpu",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 3020),
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: EXP("tag0=val0", true, 2011, "tag0=val1", true, 2011),
expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48},
},
{
name: "to_2999",
args: args{
min: 0,
max: 2999,
},
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2020),
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
{
name: "intersects block",
args: args{
min: 1500,
max: 2500,
},
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2020),
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
},
},
{
org: 0x51,
bucket: 0x61,
m: "mem",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2000, "tag0=val2", true, 3020),
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: EXP("tag0=val0", true, 2011, "tag0=val1", false, int(InvalidMinNanoTime), "tag0=val2", true, 2011),
expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48},
},
{
name: "1000_2999",
args: args{
min: 1000,
max: 2500,
},
exp: EXP("tag0=val0", true, 2020, "tag0=val1", true, 2000, "tag0=val2", true, 2020),
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
},
},
})
tsm.MustDeletePrefix(orgBucket(0x50, 0x60), 0, 2999)
tsm.MustDelete(makeKey(0x51, 0x61, "mem", "tag0=val0"))
tsm.MustDeleteRange(2000, 2999,
makeKey(0x51, 0x61, "mem", "tag0=val1"),
makeKey(0x51, 0x61, "mem", "tag0=val2"),
)
runTests("after delete", []bucketTest{
{
org: 0x50,
bucket: 0x60,
m: "cpu",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val1", true, 3020),
expStats: cursors.CursorStats{ScannedValues: 6, ScannedBytes: 48},
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: nil,
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
{
name: "to_2999",
args: args{
min: 0,
max: 2999,
},
exp: EXP("tag0=val1", false, int(InvalidMinNanoTime)),
expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24},
},
{
name: "intersects block",
args: args{
min: 1500,
max: 2500,
},
exp: EXP("tag0=val1", false, int(InvalidMinNanoTime)),
expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24},
},
{
name: "beyond all tombstones",
args: args{
min: 3000,
max: 4000,
},
exp: EXP("tag0=val1", true, 3020),
expStats: cursors.CursorStats{ScannedValues: 0, ScannedBytes: 0},
},
},
},
{
org: 0x51,
bucket: 0x61,
m: "mem",
tests: []test{
{
name: "cover file",
args: args{
min: 900,
max: 10000,
},
exp: EXP("tag0=val1", true, 1020, "tag0=val2", true, 3020),
expStats: cursors.CursorStats{ScannedValues: 10, ScannedBytes: 80},
},
{
name: "within block",
args: args{
min: 2001,
max: 2011,
},
exp: EXP("tag0=val1", false, int(InvalidMinNanoTime), "tag0=val2", false, int(InvalidMinNanoTime)),
expStats: cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24},
},
{
name: "1000_2500",
args: args{
min: 1000,
max: 2500,
},
exp: EXP("tag0=val1", true, 1020, "tag0=val2", false, int(InvalidMinNanoTime)),
expStats: cursors.CursorStats{ScannedValues: 7, ScannedBytes: 56},
},
},
},
})
}