Fix pathalogical TSM query case
This fixes a pathalogical query condition cause by and problematic structuring of TSM files based on how points were written. The condition can occur when there are multiple TSM files and a large number of points are written into the past. The earlier existing TSM files must also have points in the past and close to the present causing their time range to eclipse the later files. When this condition occurs, some queries can spend an excessive amount of time merge all the overlapping blocks. The fix was to constrain the window of overlapping blocks based on the first one we ran into. There was also a simple case in the Merge where we could skip the binary search path and just append the two inputs.pull/6725/head
parent
2cbddb3efd
commit
0b481ff627
|
@ -0,0 +1,752 @@
|
|||
// Generated by tmpl
|
||||
// https://github.com/benbjohnson/tmpl
|
||||
//
|
||||
// DO NOT EDIT!
|
||||
// Source: encoding.gen.go.tmpl
|
||||
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// Values represents a slice of values.
|
||||
type Values []Value
|
||||
|
||||
func (a Values) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a Values) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a Values) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a Values) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a Values) Deduplicate() Values {
|
||||
m := make(map[int64]Value)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(Values, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a Values) Exclude(min, max int64) Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a Values) Include(min, max int64) Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a Values) Merge(b Values) Values {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
|
||||
return append(a, b...)
|
||||
}
|
||||
|
||||
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
|
||||
return append(b, a...)
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
var k int
|
||||
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
|
||||
// Fast path where a is after b, we skip the search
|
||||
k = len(b)
|
||||
} else {
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
}
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a Values) Len() int { return len(a) }
|
||||
func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a Values) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
// FloatValues represents a slice of Float values.
|
||||
type FloatValues []FloatValue
|
||||
|
||||
func (a FloatValues) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a FloatValues) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a FloatValues) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a FloatValues) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a FloatValues) Deduplicate() FloatValues {
|
||||
m := make(map[int64]FloatValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(FloatValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a FloatValues) Exclude(min, max int64) FloatValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a FloatValues) Include(min, max int64) FloatValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a FloatValues) Merge(b FloatValues) FloatValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
|
||||
return append(a, b...)
|
||||
}
|
||||
|
||||
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
|
||||
return append(b, a...)
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
var k int
|
||||
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
|
||||
// Fast path where a is after b, we skip the search
|
||||
k = len(b)
|
||||
} else {
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
}
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a FloatValues) Len() int { return len(a) }
|
||||
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a FloatValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
// IntegerValues represents a slice of Integer values.
|
||||
type IntegerValues []IntegerValue
|
||||
|
||||
func (a IntegerValues) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a IntegerValues) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a IntegerValues) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a IntegerValues) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a IntegerValues) Deduplicate() IntegerValues {
|
||||
m := make(map[int64]IntegerValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(IntegerValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a IntegerValues) Exclude(min, max int64) IntegerValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a IntegerValues) Include(min, max int64) IntegerValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
|
||||
return append(a, b...)
|
||||
}
|
||||
|
||||
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
|
||||
return append(b, a...)
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
var k int
|
||||
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
|
||||
// Fast path where a is after b, we skip the search
|
||||
k = len(b)
|
||||
} else {
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
}
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a IntegerValues) Len() int { return len(a) }
|
||||
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a IntegerValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
// StringValues represents a slice of String values.
|
||||
type StringValues []StringValue
|
||||
|
||||
func (a StringValues) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a StringValues) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a StringValues) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a StringValues) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a StringValues) Deduplicate() StringValues {
|
||||
m := make(map[int64]StringValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(StringValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a StringValues) Exclude(min, max int64) StringValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a StringValues) Include(min, max int64) StringValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a StringValues) Merge(b StringValues) StringValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
|
||||
return append(a, b...)
|
||||
}
|
||||
|
||||
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
|
||||
return append(b, a...)
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
var k int
|
||||
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
|
||||
// Fast path where a is after b, we skip the search
|
||||
k = len(b)
|
||||
} else {
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
}
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a StringValues) Len() int { return len(a) }
|
||||
func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a StringValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
// BooleanValues represents a slice of Boolean values.
|
||||
type BooleanValues []BooleanValue
|
||||
|
||||
func (a BooleanValues) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a BooleanValues) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a BooleanValues) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a BooleanValues) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a BooleanValues) Deduplicate() BooleanValues {
|
||||
m := make(map[int64]BooleanValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(BooleanValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a BooleanValues) Exclude(min, max int64) BooleanValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a BooleanValues) Include(min, max int64) BooleanValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
|
||||
return append(a, b...)
|
||||
}
|
||||
|
||||
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
|
||||
return append(b, a...)
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
var k int
|
||||
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
|
||||
// Fast path where a is after b, we skip the search
|
||||
k = len(b)
|
||||
} else {
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
}
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a BooleanValues) Len() int { return len(a) }
|
||||
func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a BooleanValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
|
@ -0,0 +1,160 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
)
|
||||
|
||||
{{range .}}
|
||||
|
||||
// {{.Name}}Values represents a slice of {{.Name}} values.
|
||||
type {{.Name}}Values []{{.Name}}Value
|
||||
|
||||
func (a {{.Name}}Values) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a {{.Name}}Values) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a {{.Name}}Values) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a {{.Name}}Values) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a {{.Name}}Values) Deduplicate() {{.Name}}Values {
|
||||
m := make(map[int64]{{.Name}}Value)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make({{.Name}}Values, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a {{.Name}}Values) Exclude(min, max int64) {{.Name}}Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a {{.Name}}Values) Include(min, max int64) {{.Name}}Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
|
||||
return append(a, b...)
|
||||
}
|
||||
|
||||
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
|
||||
return append(b, a...)
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
var k int
|
||||
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
|
||||
// Fast path where a is after b, we skip the search
|
||||
k = len(b)
|
||||
} else {
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
}
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a {{.Name}}Values) Len() int { return len(a) }
|
||||
func (a {{.Name}}Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a {{.Name}}Values) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
|
||||
{{ end }}
|
|
@ -0,0 +1,22 @@
|
|||
[
|
||||
{
|
||||
"Name":"",
|
||||
"name":""
|
||||
},
|
||||
{
|
||||
"Name":"Float",
|
||||
"name":"float"
|
||||
},
|
||||
{
|
||||
"Name":"Integer",
|
||||
"name":"integer"
|
||||
},
|
||||
{
|
||||
"Name":"String",
|
||||
"name":"string"
|
||||
},
|
||||
{
|
||||
"Name":"Boolean",
|
||||
"name":"boolean"
|
||||
}
|
||||
]
|
|
@ -3,10 +3,8 @@ package tsm1
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
@ -66,121 +64,6 @@ func (_ *IntegerValue) internalOnly() {}
|
|||
func (_ *BooleanValue) internalOnly() {}
|
||||
func (_ *FloatValue) internalOnly() {}
|
||||
|
||||
// Values represented a time ascending sorted collection of Value types.
|
||||
// the underlying type should be the same across all values, but the interface
|
||||
// makes the code cleaner.
|
||||
type Values []Value
|
||||
|
||||
func (a Values) MinTime() int64 {
|
||||
return a[0].UnixNano()
|
||||
}
|
||||
|
||||
func (a Values) MaxTime() int64 {
|
||||
return a[len(a)-1].UnixNano()
|
||||
}
|
||||
|
||||
func (a Values) Size() int {
|
||||
sz := 0
|
||||
for _, v := range a {
|
||||
sz += v.Size()
|
||||
}
|
||||
return sz
|
||||
}
|
||||
|
||||
func (a Values) assertOrdered() {
|
||||
if len(a) <= 1 {
|
||||
return
|
||||
}
|
||||
for i := 1; i < len(a); i++ {
|
||||
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
|
||||
spew.Dump(a)
|
||||
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a Values) Merge(b Values) Values {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a Values) Exclude(min, max int64) Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a Values) Include(min, max int64) Values {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Encode converts the values to a byte slice. If there are no values,
|
||||
// this function panics.
|
||||
func (a Values) Encode(buf []byte) ([]byte, error) {
|
||||
|
@ -304,28 +187,6 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Deduplicate returns a new Values slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a Values) Deduplicate() Values {
|
||||
m := make(map[int64]Value, len(a))
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make([]Value, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(Values(other))
|
||||
return other
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a Values) Len() int { return len(a) }
|
||||
func (a Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a Values) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
type FloatValue struct {
|
||||
unixnano int64
|
||||
value float64
|
||||
|
@ -429,113 +290,6 @@ func DecodeFloatBlock(block []byte, tdec *TimeDecoder, vdec *FloatDecoder, a *[]
|
|||
return (*a)[:i], nil
|
||||
}
|
||||
|
||||
// FloatValues represents a slice of float values.
|
||||
type FloatValues []FloatValue
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a FloatValues) Deduplicate() FloatValues {
|
||||
m := make(map[int64]FloatValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(FloatValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a FloatValues) Merge(b FloatValues) FloatValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a FloatValues) Exclude(min, max int64) FloatValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a FloatValues) Include(min, max int64) FloatValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a FloatValues) Len() int { return len(a) }
|
||||
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a FloatValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
type BooleanValue struct {
|
||||
unixnano int64
|
||||
value bool
|
||||
|
@ -635,113 +389,6 @@ func DecodeBooleanBlock(block []byte, tdec *TimeDecoder, vdec *BooleanDecoder, a
|
|||
return (*a)[:i], nil
|
||||
}
|
||||
|
||||
// BooleanValues represents a slice of boolean values.
|
||||
type BooleanValues []BooleanValue
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a BooleanValues) Deduplicate() BooleanValues {
|
||||
m := make(map[int64]BooleanValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(BooleanValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a BooleanValues) Exclude(min, max int64) BooleanValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a BooleanValues) Include(min, max int64) BooleanValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a BooleanValues) Len() int { return len(a) }
|
||||
func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a BooleanValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
type IntegerValue struct {
|
||||
unixnano int64
|
||||
value int64
|
||||
|
@ -829,113 +476,6 @@ func DecodeIntegerBlock(block []byte, tdec *TimeDecoder, vdec *IntegerDecoder, a
|
|||
return (*a)[:i], nil
|
||||
}
|
||||
|
||||
// IntegerValues represents a slice of integer values.
|
||||
type IntegerValues []IntegerValue
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a IntegerValues) Deduplicate() IntegerValues {
|
||||
m := make(map[int64]IntegerValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(IntegerValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a IntegerValues) Exclude(min, max int64) IntegerValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a IntegerValues) Include(min, max int64) IntegerValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a IntegerValues) Len() int { return len(a) }
|
||||
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a IntegerValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
type StringValue struct {
|
||||
unixnano int64
|
||||
value string
|
||||
|
@ -1025,113 +565,6 @@ func DecodeStringBlock(block []byte, tdec *TimeDecoder, vdec *StringDecoder, a *
|
|||
return (*a)[:i], nil
|
||||
}
|
||||
|
||||
// StringValues represents a slice of string values.
|
||||
type StringValues []StringValue
|
||||
|
||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||
// The Value that appears last in the slice is the one that is kept.
|
||||
func (a StringValues) Deduplicate() StringValues {
|
||||
m := make(map[int64]StringValue)
|
||||
for _, val := range a {
|
||||
m[val.UnixNano()] = val
|
||||
}
|
||||
|
||||
other := make(StringValues, 0, len(m))
|
||||
for _, val := range m {
|
||||
other = append(other, val)
|
||||
}
|
||||
|
||||
sort.Sort(other)
|
||||
return other
|
||||
}
|
||||
|
||||
// Exclude returns the subset of values not in [min, max]
|
||||
func (a StringValues) Exclude(min, max int64) StringValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive.
|
||||
func (a StringValues) Include(min, max int64) StringValues {
|
||||
var i int
|
||||
for j := 0; j < len(a); j++ {
|
||||
if a[j].UnixNano() < min || a[j].UnixNano() > max {
|
||||
continue
|
||||
}
|
||||
|
||||
a[i] = a[j]
|
||||
i++
|
||||
}
|
||||
return a[:i]
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a StringValues) Merge(b StringValues) StringValues {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
for i := 0; i < len(a) && len(b) > 0; i++ {
|
||||
av, bv := a[i].UnixNano(), b[0].UnixNano()
|
||||
// Value in a is greater than B, we need to merge
|
||||
if av > bv {
|
||||
// Save value in a
|
||||
temp := a[i]
|
||||
|
||||
// Overwrite a with b
|
||||
a[i] = b[0]
|
||||
|
||||
// Slide all values of b down 1
|
||||
copy(b, b[1:])
|
||||
b = b[:len(b)-1]
|
||||
|
||||
// See where value we save from a should be inserted in b to keep b sorted
|
||||
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
|
||||
|
||||
if k == len(b) {
|
||||
// Last position?
|
||||
b = append(b, temp)
|
||||
} else if b[k].UnixNano() != temp.UnixNano() {
|
||||
// Save the last element, since it will get overwritten
|
||||
last := b[len(b)-1]
|
||||
// Somewhere in the middle of b, insert it only if it's not a duplicate
|
||||
copy(b[k+1:], b[k:])
|
||||
// Add the last vale to the end
|
||||
b = append(b, last)
|
||||
b[k] = temp
|
||||
}
|
||||
} else if av == bv {
|
||||
// Value in a an b are the same, use b
|
||||
a[i] = b[0]
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
if len(b) > 0 {
|
||||
return append(a, b...)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Sort methods
|
||||
func (a StringValues) Len() int { return len(a) }
|
||||
func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a StringValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
|
||||
|
||||
func packBlockHeader(blockType byte) []byte {
|
||||
return []byte{blockType}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ import (
|
|||
)
|
||||
|
||||
//go:generate tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl
|
||||
//go:generate tmpl -data=@file_store.gen.go.tmpldata file_store.gen.go.tmpl
|
||||
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
|
||||
|
||||
func init() {
|
||||
tsdb.RegisterEngine("tsm1", NewEngine)
|
||||
|
|
|
@ -0,0 +1,527 @@
|
|||
// Generated by tmpl
|
||||
// https://github.com/benbjohnson/tmpl
|
||||
//
|
||||
// DO NOT EDIT!
|
||||
// Source: file_store.gen.go.tmpl
|
||||
|
||||
package tsm1
|
||||
|
||||
// ReadFloatBlock reads the next block as a set of float values.
|
||||
func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[]FloatValue) ([]FloatValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = FloatValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterFloatValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Use the current block time range as our overlapping window
|
||||
minT, maxT := values[0].UnixNano(), values[len(values)-1].UnixNano()
|
||||
if c.ascending {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
maxT = cur.entry.MaxTime
|
||||
values = FloatValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []FloatValue
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterFloatValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = FloatValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
// Only use values in the overlapping window
|
||||
v = FloatValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the remaing values with the existing
|
||||
values = FloatValues(values).Merge(v)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
minT = cur.entry.MinTime
|
||||
values = FloatValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []FloatValue
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterFloatValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = FloatValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = FloatValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = FloatValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
first.markRead(minT, maxT)
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
// ReadIntegerBlock reads the next block as a set of integer values.
|
||||
func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, buf *[]IntegerValue) ([]IntegerValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = IntegerValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterIntegerValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Use the current block time range as our overlapping window
|
||||
minT, maxT := values[0].UnixNano(), values[len(values)-1].UnixNano()
|
||||
if c.ascending {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
maxT = cur.entry.MaxTime
|
||||
values = IntegerValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []IntegerValue
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterIntegerValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = IntegerValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
// Only use values in the overlapping window
|
||||
v = IntegerValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the remaing values with the existing
|
||||
values = IntegerValues(values).Merge(v)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
minT = cur.entry.MinTime
|
||||
values = IntegerValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []IntegerValue
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterIntegerValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = IntegerValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = IntegerValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = IntegerValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
first.markRead(minT, maxT)
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
// ReadStringBlock reads the next block as a set of string values.
|
||||
func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf *[]StringValue) ([]StringValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = StringValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterStringValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Use the current block time range as our overlapping window
|
||||
minT, maxT := values[0].UnixNano(), values[len(values)-1].UnixNano()
|
||||
if c.ascending {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
maxT = cur.entry.MaxTime
|
||||
values = StringValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []StringValue
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterStringValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = StringValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
// Only use values in the overlapping window
|
||||
v = StringValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the remaing values with the existing
|
||||
values = StringValues(values).Merge(v)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
minT = cur.entry.MinTime
|
||||
values = StringValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []StringValue
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterStringValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = StringValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = StringValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = StringValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
first.markRead(minT, maxT)
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
// ReadBooleanBlock reads the next block as a set of boolean values.
|
||||
func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, buf *[]BooleanValue) ([]BooleanValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = BooleanValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterBooleanValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Use the current block time range as our overlapping window
|
||||
minT, maxT := values[0].UnixNano(), values[len(values)-1].UnixNano()
|
||||
if c.ascending {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
maxT = cur.entry.MaxTime
|
||||
values = BooleanValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []BooleanValue
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterBooleanValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = BooleanValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
// Only use values in the overlapping window
|
||||
v = BooleanValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the remaing values with the existing
|
||||
values = BooleanValues(values).Merge(v)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
minT = cur.entry.MinTime
|
||||
values = BooleanValues(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []BooleanValue
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterBooleanValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = BooleanValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = BooleanValues(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = BooleanValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
first.markRead(minT, maxT)
|
||||
|
||||
return values, err
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
package tsm1
|
||||
|
||||
{{range .}}
|
||||
// Read{{.Name}}Block reads the next block as a set of {{.name}} values.
|
||||
func (c *KeyCursor) Read{{.Name}}Block(tdec *TimeDecoder, vdec *{{.Name}}Decoder, buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're searching for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.Read{{.Name}}BlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = {{.Name}}Values(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filter{{.Name}}Values(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Use the current block time range as our overlapping window
|
||||
minT, maxT := values[0].UnixNano(), values[len(values)-1].UnixNano()
|
||||
if c.ascending {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
maxT = cur.entry.MaxTime
|
||||
values = {{.Name}}Values(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
var a []{{.Name}}Value
|
||||
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filter{{.Name}}Values(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = {{.Name}}Values(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
// Only use values in the overlapping window
|
||||
v = {{.Name}}Values(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the remaing values with the existing
|
||||
values = {{.Name}}Values(values).Merge(v)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Find first block that overlaps our window
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
||||
// Shrink our window so it's the intersection of the first overlapping block and the
|
||||
// first block. We do this to minimize the region that overlaps and needs to
|
||||
// be merged.
|
||||
minT = cur.entry.MinTime
|
||||
values = {{.Name}}Values(values).Include(minT, maxT)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Search the remaining blocks that overlap our window and append their values so we can
|
||||
// merge them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
|
||||
var a []{{.Name}}Value
|
||||
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filter{{.Name}}Values(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = {{.Name}}Values(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = {{.Name}}Values(v).Include(minT, maxT)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = {{.Name}}Values(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
first.markRead(minT, maxT)
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
{{ end }}
|
|
@ -0,0 +1,18 @@
|
|||
[
|
||||
{
|
||||
"Name":"Float",
|
||||
"name":"float"
|
||||
},
|
||||
{
|
||||
"Name":"Integer",
|
||||
"name":"integer"
|
||||
},
|
||||
{
|
||||
"Name":"String",
|
||||
"name":"string"
|
||||
},
|
||||
{
|
||||
"Name":"Boolean",
|
||||
"name":"boolean"
|
||||
}
|
||||
]
|
|
@ -720,7 +720,7 @@ type descLocations []*location
|
|||
func (a descLocations) Len() int { return len(a) }
|
||||
func (a descLocations) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a descLocations) Less(i, j int) bool {
|
||||
if a[i].entry.MaxTime == a[j].entry.MaxTime {
|
||||
if a[i].entry.OverlapsTimeRange(a[j].entry.MinTime, a[j].entry.MaxTime) {
|
||||
return a[i].r.Path() < a[j].r.Path()
|
||||
}
|
||||
return a[i].entry.MaxTime < a[j].entry.MaxTime
|
||||
|
@ -732,7 +732,7 @@ type ascLocations []*location
|
|||
func (a ascLocations) Len() int { return len(a) }
|
||||
func (a ascLocations) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a ascLocations) Less(i, j int) bool {
|
||||
if a[i].entry.MinTime == a[j].entry.MinTime {
|
||||
if a[i].entry.OverlapsTimeRange(a[j].entry.MinTime, a[j].entry.MaxTime) {
|
||||
return a[i].r.Path() < a[j].r.Path()
|
||||
}
|
||||
return a[i].entry.MinTime < a[j].entry.MinTime
|
||||
|
@ -839,6 +839,10 @@ func (c *KeyCursor) seekDescending(t int64) {
|
|||
// Next moves the cursor to the next position.
|
||||
// Data should be read by the ReadBlock functions.
|
||||
func (c *KeyCursor) Next() {
|
||||
// Do we still have unread values in the current block
|
||||
if !c.current[0].read() {
|
||||
return
|
||||
}
|
||||
c.current = c.current[:0]
|
||||
if c.ascending {
|
||||
c.nextAscending()
|
||||
|
@ -918,366 +922,6 @@ func (c *KeyCursor) nextDescending() {
|
|||
}
|
||||
}
|
||||
|
||||
// ReadFloatBlock reads the next block as a set of float values.
|
||||
func (c *KeyCursor) ReadFloatBlock(tdec *TimeDecoder, vdec *FloatDecoder, buf *[]FloatValue) ([]FloatValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're search for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadFloatBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = FloatValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Record the range of values we're using
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterFloatValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ascending {
|
||||
var a []FloatValue
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterFloatValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = FloatValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
v = FloatValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the new values with the existing, but only include the values with the range
|
||||
// of the first block we are decoding
|
||||
values = FloatValues(values).Merge(v)
|
||||
}
|
||||
|
||||
} else {
|
||||
var a []FloatValue
|
||||
v, err := cur.r.ReadFloatBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterFloatValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = FloatValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = FloatValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = FloatValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
// ReadIntegerBlock reads the next block as a set of integer values.
|
||||
func (c *KeyCursor) ReadIntegerBlock(tdec *TimeDecoder, vdec *IntegerDecoder, buf *[]IntegerValue) ([]IntegerValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're search for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadIntegerBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = IntegerValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Record the range of values we're using
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterIntegerValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ascending {
|
||||
var a []IntegerValue
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterIntegerValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = IntegerValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
v = IntegerValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the new values with the existing, but only include the values with the range
|
||||
// of the first block we are decoding
|
||||
values = IntegerValues(values).Merge(v)
|
||||
}
|
||||
|
||||
} else {
|
||||
var a []IntegerValue
|
||||
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterIntegerValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = IntegerValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = IntegerValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = IntegerValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
// ReadStringBlock reads the next block as a set of string values.
|
||||
func (c *KeyCursor) ReadStringBlock(tdec *TimeDecoder, vdec *StringDecoder, buf *[]StringValue) ([]StringValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're search for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadStringBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = StringValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Record the range of values we're using
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterStringValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ascending {
|
||||
var a []StringValue
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterStringValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = StringValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
v = StringValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the new values with the existing, but only include the values with the range
|
||||
// of the first block we are decoding
|
||||
values = StringValues(values).Merge(v)
|
||||
}
|
||||
|
||||
} else {
|
||||
var a []StringValue
|
||||
v, err := cur.r.ReadStringBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterStringValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = StringValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = StringValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = StringValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
// ReadBooleanBlock reads the next block as a set of boolean values.
|
||||
func (c *KeyCursor) ReadBooleanBlock(tdec *TimeDecoder, vdec *BooleanDecoder, buf *[]BooleanValue) ([]BooleanValue, error) {
|
||||
// No matching blocks to decode
|
||||
if len(c.current) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// First block is the oldest block containing the points we're search for.
|
||||
first := c.current[0]
|
||||
*buf = (*buf)[:0]
|
||||
values, err := first.r.ReadBooleanBlockAt(&first.entry, tdec, vdec, buf)
|
||||
|
||||
// Remove values we already read
|
||||
values = BooleanValues(values).Exclude(first.readMin, first.readMax)
|
||||
|
||||
// Record the range of values we're using
|
||||
if len(values) > 0 {
|
||||
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
||||
}
|
||||
|
||||
// Remove any tombstones
|
||||
tombstones := first.r.TombstoneRange(c.key)
|
||||
values = c.filterBooleanValues(tombstones, values)
|
||||
|
||||
// Only one block with this key and time range so return it
|
||||
if len(c.current) == 1 {
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Otherwise, search the remaining blocks that overlap and append their values so we can
|
||||
// dedup them.
|
||||
for i := 1; i < len(c.current); i++ {
|
||||
cur := c.current[i]
|
||||
tombstones := cur.r.TombstoneRange(c.key)
|
||||
// Skip this block if it doesn't contain points we looking for or they have already been read
|
||||
if !cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) || cur.read() {
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ascending {
|
||||
var a []BooleanValue
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterBooleanValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = BooleanValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
if len(v) > 0 {
|
||||
v = BooleanValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
// Merge the new values with the existing, but only include the values with the range
|
||||
// of the first block we are decoding
|
||||
values = BooleanValues(values).Merge(v)
|
||||
}
|
||||
|
||||
} else {
|
||||
var a []BooleanValue
|
||||
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, tdec, vdec, &a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any tombstoned values
|
||||
v = c.filterBooleanValues(tombstones, v)
|
||||
|
||||
// Remove values we already read
|
||||
v = BooleanValues(v).Exclude(cur.readMin, cur.readMax)
|
||||
|
||||
// If the block we decoded should have all of it's values included, mark it as read so we
|
||||
// don't use it again.
|
||||
if len(v) > 0 {
|
||||
v = BooleanValues(v).Include(first.entry.MinTime, first.entry.MaxTime)
|
||||
|
||||
if len(v) > 0 {
|
||||
cur.markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||||
}
|
||||
values = BooleanValues(v).Merge(values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return values, err
|
||||
}
|
||||
|
||||
func (c *KeyCursor) filterFloatValues(tombstones []TimeRange, values FloatValues) FloatValues {
|
||||
for _, t := range tombstones {
|
||||
values = values.Exclude(t.Min, t.Max)
|
||||
|
|
|
@ -206,7 +206,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
|
|||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 0.0), tsm1.NewValue(1, 1.0)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, 3.0)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 4.0), tsm1.NewValue(7, 7.0)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 4.0), tsm1.NewValue(2, 7.0)}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
|
@ -227,6 +227,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
|
|||
exp := []tsm1.Value{
|
||||
data[3].values[0],
|
||||
data[0].values[1],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
@ -246,9 +247,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
|
|||
}
|
||||
|
||||
exp = []tsm1.Value{
|
||||
data[1].values[0],
|
||||
data[2].values[0],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
@ -274,7 +273,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
|
|||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(0)), tsm1.NewValue(1, int64(1))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(4)), tsm1.NewValue(7, int64(7))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, int64(4)), tsm1.NewValue(2, int64(7))}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
|
@ -295,6 +294,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
|
|||
exp := []tsm1.Value{
|
||||
data[3].values[0],
|
||||
data[0].values[1],
|
||||
data[3].values[1],
|
||||
}
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -313,9 +313,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
|
|||
}
|
||||
|
||||
exp = []tsm1.Value{
|
||||
data[1].values[0],
|
||||
data[2].values[0],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
@ -341,7 +339,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
|
|||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, true), tsm1.NewValue(1, false)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, true)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, true)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, false), tsm1.NewValue(7, true)}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, false), tsm1.NewValue(2, true)}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
|
@ -362,6 +360,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
|
|||
exp := []tsm1.Value{
|
||||
data[3].values[0],
|
||||
data[0].values[1],
|
||||
data[3].values[1],
|
||||
}
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -380,9 +379,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
|
|||
}
|
||||
|
||||
exp = []tsm1.Value{
|
||||
data[1].values[0],
|
||||
data[2].values[0],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
@ -408,7 +405,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
|
|||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "zero"), tsm1.NewValue(1, "one")}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, "two")}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, "three")}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "four"), tsm1.NewValue(7, "seven")}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, "four"), tsm1.NewValue(2, "seven")}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
|
@ -429,6 +426,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
|
|||
exp := []tsm1.Value{
|
||||
data[3].values[0],
|
||||
data[0].values[1],
|
||||
data[3].values[1],
|
||||
}
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
|
||||
|
@ -447,9 +445,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
|
|||
}
|
||||
|
||||
exp = []tsm1.Value{
|
||||
data[1].values[0],
|
||||
data[2].values[0],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
@ -784,7 +780,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
|
|||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, int64(0)), tsm1.NewValue(9, int64(1))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, int64(2))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(3))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(4)), tsm1.NewValue(7, int64(7))}},
|
||||
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, int64(4)), tsm1.NewValue(10, int64(7))}},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
|
@ -804,6 +800,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
|
|||
exp := []tsm1.Value{
|
||||
data[0].values[0],
|
||||
data[0].values[1],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
@ -825,7 +822,6 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
|
|||
|
||||
exp = []tsm1.Value{
|
||||
data[3].values[0],
|
||||
data[3].values[1],
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(exp); got != exp {
|
||||
|
|
Loading…
Reference in New Issue