feat: Teach Values how to determine it contains data for a time interval

Add a Contains API which is a peer to the TimestampArray.Contains
function. This is used by the schema APIs to determine if data exists
in the cache for a given key and time interval.
pull/13426/head
Stuart Carnie 2019-04-17 16:23:40 -07:00
parent 1ddd0445d8
commit 7544ea0a5b
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
2 changed files with 68 additions and 0 deletions

View File

@ -114,6 +114,29 @@ func (a Values) Encode(buf []byte) ([]byte, error) {
return nil, fmt.Errorf("unsupported value type %T", a[0])
}
// Contains returns true if values exist for the time interval [min, max]
// inclusive. The values must be deduplicated and sorted before calling
// Contains or the results are undefined.
func (a Values) Contains(min, max int64) bool {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
return false
}
// a[rmin].UnixNano() ≥ min
// a[rmax].UnixNano() ≥ max
if a[rmin].UnixNano() == min {
return true
}
if rmax < a.Len() && a[rmax].UnixNano() == max {
return true
}
return rmax-rmin > 0
}
// InfluxQLType returns the influxql.DataType the values map to.
func (a Values) InfluxQLType() (influxql.DataType, error) {
if len(a) == 0 {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/tsdb/tsm1"
)
@ -569,6 +570,50 @@ func TestValues_MergeFloat(t *testing.T) {
}
}
func TestValues_Contains(t *testing.T) {
makeValues := func(count int, min, max int64) tsm1.Values {
vals := make(tsm1.Values, count)
ts := min
inc := (max - min) / int64(count)
for i := 0; i < count; i++ {
vals[i] = tsm1.NewRawIntegerValue(ts, 0)
ts += inc
}
return vals
}
cases := []struct {
n string
min, max int64
exp bool
}{
{"no/lo", 0, 9, false},
{"no/hi", 19, 30, false},
{"no/middle", 13, 13, false},
{"yes/first", 0, 10, true},
{"yes/first-eq", 10, 10, true},
{"yes/last", 18, 20, true},
{"yes/last-eq", 18, 18, true},
{"yes/all but first and last", 12, 16, true},
{"yes/middle-eq", 14, 14, true},
{"yes/middle-overlap", 13, 15, true},
{"yes/covers", 8, 22, true},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%s[%d,%d]", tc.n, tc.min, tc.max), func(t *testing.T) {
vals := makeValues(5, 10, 20)
if got := vals.Contains(tc.min, tc.max); got != tc.exp {
t.Errorf("Contains -got/+exp\n%s", cmp.Diff(got, tc.exp))
}
})
}
}
func TestIntegerValues_Merge(t *testing.T) {
integerValue := func(t int64, f int64) tsm1.IntegerValue {
return tsm1.NewValue(t, f).(tsm1.IntegerValue)