Address PR feedback
parent
a792fbbdfa
commit
5054d6fae4
File diff suppressed because it is too large
Load Diff
|
@ -1,211 +0,0 @@
|
|||
package tsdb
|
||||
|
||||
{{range .}}
|
||||
|
||||
{{ $typename := print .Name "Array" }}
|
||||
|
||||
type {{ $typename }} struct {
|
||||
Timestamps []int64
|
||||
Values []{{.Type}}
|
||||
}
|
||||
|
||||
func New{{$typename}}Len(sz int) *{{$typename}} {
|
||||
return &{{$typename}}{
|
||||
Timestamps: make([]int64, sz),
|
||||
Values: make([]{{.Type}}, sz),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *{{ $typename }}) MinTime() int64 {
|
||||
return a.Timestamps[0]
|
||||
}
|
||||
|
||||
func (a *{{ $typename }}) MaxTime() int64 {
|
||||
return a.Timestamps[len(a.Timestamps)-1]
|
||||
}
|
||||
|
||||
func (a *{{ $typename }}) Size() int {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (a *{{ $typename}}) Len() int {
|
||||
if a == nil {
|
||||
return 0
|
||||
}
|
||||
return len(a.Timestamps)
|
||||
}
|
||||
|
||||
// Exclude removes the subset of values in [min, max]. The values must
|
||||
// be deduplicated and sorted before calling Exclude or the results are undefined.
|
||||
func (a *{{ $typename }}) Exclude(min, max int64) {
|
||||
rmin, rmax := a.FindRange(min, max)
|
||||
if rmin == -1 && rmax == -1 {
|
||||
return
|
||||
}
|
||||
|
||||
// a.Timestamps[rmin] ≥ min
|
||||
// a.Timestamps[rmax] ≥ max
|
||||
|
||||
if rmax < a.Len() {
|
||||
if a.Timestamps[rmax] == max {
|
||||
rmax++
|
||||
}
|
||||
rest := a.Len()-rmax
|
||||
if rest > 0 {
|
||||
ts := a.Timestamps[:rmin+rest]
|
||||
copy(ts[rmin:], a.Timestamps[rmax:])
|
||||
a.Timestamps = ts
|
||||
|
||||
vs := a.Values[:rmin+rest]
|
||||
copy(vs[rmin:], a.Values[rmax:])
|
||||
a.Values = vs
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
a.Timestamps = a.Timestamps[:rmin]
|
||||
a.Values = a.Values[:rmin]
|
||||
}
|
||||
|
||||
// Include returns the subset values between min and max inclusive. The values must
|
||||
// be deduplicated and sorted before calling Include or the results are undefined.
|
||||
func (a *{{ $typename }}) Include(min, max int64) {
|
||||
rmin, rmax := a.FindRange(min, max)
|
||||
if rmin == -1 && rmax == -1 {
|
||||
a.Timestamps = a.Timestamps[:0]
|
||||
a.Values = a.Values[:0]
|
||||
return
|
||||
}
|
||||
|
||||
// a.Timestamps[rmin] ≥ min
|
||||
// a.Timestamps[rmax] ≥ max
|
||||
|
||||
if rmax < a.Len() && a.Timestamps[rmax] == max {
|
||||
rmax++
|
||||
}
|
||||
|
||||
if rmin > -1 {
|
||||
ts := a.Timestamps[:rmax-rmin]
|
||||
copy(ts, a.Timestamps[rmin:rmax])
|
||||
a.Timestamps = ts
|
||||
vs := a.Values[:rmax-rmin]
|
||||
copy(vs, a.Values[rmin:rmax])
|
||||
a.Values = vs
|
||||
} else {
|
||||
a.Timestamps = a.Timestamps[:rmax]
|
||||
a.Values = a.Values[:rmax]
|
||||
}
|
||||
}
|
||||
|
||||
// search performs a binary search for UnixNano() v in a
|
||||
// and returns the position, i, where v would be inserted.
|
||||
// An additional check of a.Timestamps[i] == v is necessary
|
||||
// to determine if the value v exists.
|
||||
func (a *{{ $typename }}) search(v int64) int {
|
||||
// Define: f(x) → a.Timestamps[x] < v
|
||||
// Define: f(-1) == true, f(n) == false
|
||||
// Invariant: f(lo-1) == true, f(hi) == false
|
||||
lo := 0
|
||||
hi := a.Len()
|
||||
for lo < hi {
|
||||
mid := int(uint(lo+hi) >> 1)
|
||||
if a.Timestamps[mid] < v {
|
||||
lo = mid + 1 // preserves f(lo-1) == true
|
||||
} else {
|
||||
hi = mid // preserves f(hi) == false
|
||||
}
|
||||
}
|
||||
|
||||
// lo == hi
|
||||
return lo
|
||||
}
|
||||
|
||||
// FindRange returns the positions where min and max would be
|
||||
// inserted into the array. If a[0].UnixNano() > max or
|
||||
// a[len-1].UnixNano() < min then FindRange returns (-1, -1)
|
||||
// indicating the array is outside the [min, max]. The values must
|
||||
// be deduplicated and sorted before calling Exclude or the results
|
||||
// are undefined.
|
||||
func (a *{{ $typename }}) FindRange(min, max int64) (int, int) {
|
||||
if a.Len() == 0 || min > max {
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
minVal := a.MinTime()
|
||||
maxVal := a.MaxTime()
|
||||
|
||||
if maxVal < min || minVal > max {
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
return a.search(min), a.search(max)
|
||||
}
|
||||
|
||||
// Merge overlays b to top of a. If two values conflict with
|
||||
// the same timestamp, b is used. Both a and b must be sorted
|
||||
// in ascending order.
|
||||
func (a *{{ $typename }}) Merge(b *{{ $typename }}) {
|
||||
if a.Len() == 0 {
|
||||
*a = *b
|
||||
return
|
||||
}
|
||||
|
||||
if b.Len() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
|
||||
// possible stored blocks might contain duplicate values. Remove them if they exists before
|
||||
// merging.
|
||||
// a = a.Deduplicate()
|
||||
// b = b.Deduplicate()
|
||||
|
||||
if a.MaxTime() < b.MinTime() {
|
||||
a.Timestamps = append(a.Timestamps, b.Timestamps...)
|
||||
a.Values = append(a.Values, b.Values...)
|
||||
return
|
||||
}
|
||||
|
||||
if b.MaxTime() < a.MinTime() {
|
||||
var tmp {{$typename}}
|
||||
tmp.Timestamps = append(b.Timestamps, a.Timestamps...)
|
||||
tmp.Values = append(b.Values, a.Values...)
|
||||
*a = tmp
|
||||
return
|
||||
}
|
||||
|
||||
out := New{{$typename}}Len(a.Len()+b.Len())
|
||||
i, j, k := 0, 0, 0
|
||||
for i < len(a.Timestamps) && j < len(b.Timestamps) {
|
||||
if a.Timestamps[i] < b.Timestamps[j] {
|
||||
out.Timestamps[k] = a.Timestamps[i]
|
||||
out.Values[k] = a.Values[i]
|
||||
i++
|
||||
} else if a.Timestamps[i] == b.Timestamps[j] {
|
||||
out.Timestamps[k] = b.Timestamps[j]
|
||||
out.Values[k] = b.Values[j]
|
||||
i++
|
||||
j++
|
||||
} else {
|
||||
out.Timestamps[k] = b.Timestamps[j]
|
||||
out.Values[k] = b.Values[j]
|
||||
j++
|
||||
}
|
||||
k++
|
||||
}
|
||||
|
||||
if i < len(a.Timestamps) {
|
||||
n := copy(out.Timestamps[k:], a.Timestamps[i:])
|
||||
copy(out.Values[k:], a.Values[i:])
|
||||
k += n
|
||||
} else if j < len(b.Timestamps) {
|
||||
n := copy(out.Timestamps[k:], b.Timestamps[j:])
|
||||
copy(out.Values[k:], b.Values[j:])
|
||||
k += n
|
||||
}
|
||||
|
||||
a.Timestamps = out.Timestamps[:k]
|
||||
a.Values = out.Values[:k]
|
||||
}
|
||||
|
||||
{{ end }}
|
|
@ -1,7 +0,0 @@
|
|||
/*
|
||||
Package tsdb implements a durable time series database.
|
||||
|
||||
*/
|
||||
package tsdb
|
||||
|
||||
//go:generate tmpl -data=@arrayvalues.gen.go.tmpldata arrayvalues.gen.go.tmpl
|
|
@ -184,7 +184,7 @@ func FloatArrayEncodeAll(src []float64, b []byte) ([]byte, error) {
|
|||
v = sigbits << 58 // Move 6 LSB of sigbits to MSB
|
||||
mask = v >> 56 // Move 6 MSB to 8 LSB
|
||||
if m <= 2 {
|
||||
// The 6 bits fir into the current byte.
|
||||
// The 6 bits fit into the current byte.
|
||||
b[n>>3] |= byte(mask >> m)
|
||||
n += l
|
||||
} else { // In this case there are fewer than 6 bits available in current byte.
|
||||
|
|
|
@ -65,7 +65,6 @@ func TestFloatArrayEncode_Compare(t *testing.T) {
|
|||
input[i] = (rand.Float64() * math.MaxFloat64) - math.MaxFloat32
|
||||
}
|
||||
|
||||
// Example from the paper
|
||||
s := tsm1.NewFloatEncoder()
|
||||
for _, v := range input {
|
||||
s.Write(v)
|
||||
|
|
|
@ -1098,75 +1098,74 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
|
|||
// Since we combined multiple blocks, we could have more values than we should put into
|
||||
// a single block. We need to chunk them up into groups and re-encode them.
|
||||
return k.chunkFloat(nil)
|
||||
} else {
|
||||
var i int
|
||||
}
|
||||
var i int
|
||||
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedFloatValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.FloatArray
|
||||
if err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedFloatValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkFloat(k.merged)
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedFloatValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.FloatArray
|
||||
if err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedFloatValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkFloat(k.merged)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) chunkFloat(dst blocks) blocks {
|
||||
|
@ -1303,75 +1302,74 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks {
|
|||
// Since we combined multiple blocks, we could have more values than we should put into
|
||||
// a single block. We need to chunk them up into groups and re-encode them.
|
||||
return k.chunkInteger(nil)
|
||||
} else {
|
||||
var i int
|
||||
}
|
||||
var i int
|
||||
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedIntegerValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.IntegerArray
|
||||
if err := DecodeIntegerArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedIntegerValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkInteger(k.merged)
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedIntegerValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.IntegerArray
|
||||
if err := DecodeIntegerArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedIntegerValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkInteger(k.merged)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) chunkInteger(dst blocks) blocks {
|
||||
|
@ -1508,75 +1506,74 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks {
|
|||
// Since we combined multiple blocks, we could have more values than we should put into
|
||||
// a single block. We need to chunk them up into groups and re-encode them.
|
||||
return k.chunkUnsigned(nil)
|
||||
} else {
|
||||
var i int
|
||||
}
|
||||
var i int
|
||||
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedUnsignedValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.UnsignedArray
|
||||
if err := DecodeUnsignedArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedUnsignedValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkUnsigned(k.merged)
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedUnsignedValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.UnsignedArray
|
||||
if err := DecodeUnsignedArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedUnsignedValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkUnsigned(k.merged)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) chunkUnsigned(dst blocks) blocks {
|
||||
|
@ -1713,75 +1710,74 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks {
|
|||
// Since we combined multiple blocks, we could have more values than we should put into
|
||||
// a single block. We need to chunk them up into groups and re-encode them.
|
||||
return k.chunkString(nil)
|
||||
} else {
|
||||
var i int
|
||||
}
|
||||
var i int
|
||||
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedStringValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.StringArray
|
||||
if err := DecodeStringArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedStringValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkString(k.merged)
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedStringValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.StringArray
|
||||
if err := DecodeStringArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedStringValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkString(k.merged)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) chunkString(dst blocks) blocks {
|
||||
|
@ -1918,75 +1914,74 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks {
|
|||
// Since we combined multiple blocks, we could have more values than we should put into
|
||||
// a single block. We need to chunk them up into groups and re-encode them.
|
||||
return k.chunkBoolean(nil)
|
||||
} else {
|
||||
var i int
|
||||
}
|
||||
var i int
|
||||
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedBooleanValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.BooleanArray
|
||||
if err := DecodeBooleanArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedBooleanValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkBoolean(k.merged)
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.mergedBooleanValues.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.BooleanArray
|
||||
if err := DecodeBooleanArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.mergedBooleanValues.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunkBoolean(k.merged)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) chunkBoolean(dst blocks) blocks {
|
||||
|
|
|
@ -301,75 +301,74 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
|||
// Since we combined multiple blocks, we could have more values than we should put into
|
||||
// a single block. We need to chunk them up into groups and re-encode them.
|
||||
return k.chunk{{.Name}}(nil)
|
||||
} else {
|
||||
var i int
|
||||
}
|
||||
var i int
|
||||
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
// If we this block is already full, just add it as is
|
||||
if BlockCount(k.blocks[i].b) >= k.size {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
|
||||
if k.fast {
|
||||
for i < len(k.blocks) {
|
||||
// skip this block if it's values were already read
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.merged{{.Name}}Values.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.{{.Name}}Array
|
||||
if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.merged{{.Name}}Values.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunk{{.Name}}(k.merged)
|
||||
}
|
||||
|
||||
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||||
if i == len(k.blocks)-1 {
|
||||
if !k.blocks[i].read() {
|
||||
k.merged = append(k.merged, k.blocks[i])
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// The remaining blocks can be combined and we know that they do not overlap and
|
||||
// so we can just append each, sort and re-encode.
|
||||
for i < len(k.blocks) && k.merged{{.Name}}Values.Len() < k.size {
|
||||
if k.blocks[i].read() {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
var v tsdb.{{.Name}}Array
|
||||
if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply each tombstone to the block
|
||||
for _, ts := range k.blocks[i].tombstones {
|
||||
v.Exclude(ts.Min, ts.Max)
|
||||
}
|
||||
|
||||
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||||
|
||||
k.merged{{.Name}}Values.Merge(&v)
|
||||
i++
|
||||
}
|
||||
|
||||
k.blocks = k.blocks[i:]
|
||||
|
||||
return k.chunk{{.Name}}(k.merged)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks {
|
||||
|
|
|
@ -1677,16 +1677,50 @@ RETRY:
|
|||
|
||||
// Read the next block from each TSM iterator
|
||||
for i, v := range k.buf {
|
||||
if len(v) == 0 {
|
||||
iter := k.iterators[i]
|
||||
if iter.Next() {
|
||||
if len(v) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
iter := k.iterators[i]
|
||||
if iter.Next() {
|
||||
key, minTime, maxTime, typ, _, b, err := iter.Read()
|
||||
if err != nil {
|
||||
k.err = err
|
||||
}
|
||||
|
||||
// This block may have ranges of time removed from it that would
|
||||
// reduce the block min and max time.
|
||||
tombstones := iter.r.TombstoneRange(key)
|
||||
|
||||
var blk *block
|
||||
if cap(k.buf[i]) > len(k.buf[i]) {
|
||||
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
|
||||
blk = k.buf[i][len(k.buf[i])-1]
|
||||
if blk == nil {
|
||||
blk = &block{}
|
||||
k.buf[i][len(k.buf[i])-1] = blk
|
||||
}
|
||||
} else {
|
||||
blk = &block{}
|
||||
k.buf[i] = append(k.buf[i], blk)
|
||||
}
|
||||
blk.minTime = minTime
|
||||
blk.maxTime = maxTime
|
||||
blk.key = key
|
||||
blk.typ = typ
|
||||
blk.b = b
|
||||
blk.tombstones = tombstones
|
||||
blk.readMin = math.MaxInt64
|
||||
blk.readMax = math.MinInt64
|
||||
|
||||
blockKey := key
|
||||
for bytes.Equal(iter.PeekNext(), blockKey) {
|
||||
iter.Next()
|
||||
key, minTime, maxTime, typ, _, b, err := iter.Read()
|
||||
if err != nil {
|
||||
k.err = err
|
||||
}
|
||||
|
||||
// This block may have ranges of time removed from it that would
|
||||
// reduce the block min and max time.
|
||||
tombstones := iter.r.TombstoneRange(key)
|
||||
|
||||
var blk *block
|
||||
|
@ -1701,6 +1735,7 @@ RETRY:
|
|||
blk = &block{}
|
||||
k.buf[i] = append(k.buf[i], blk)
|
||||
}
|
||||
|
||||
blk.minTime = minTime
|
||||
blk.maxTime = maxTime
|
||||
blk.key = key
|
||||
|
@ -1709,44 +1744,11 @@ RETRY:
|
|||
blk.tombstones = tombstones
|
||||
blk.readMin = math.MaxInt64
|
||||
blk.readMax = math.MinInt64
|
||||
|
||||
blockKey := key
|
||||
for bytes.Equal(iter.PeekNext(), blockKey) {
|
||||
iter.Next()
|
||||
key, minTime, maxTime, typ, _, b, err := iter.Read()
|
||||
if err != nil {
|
||||
k.err = err
|
||||
}
|
||||
|
||||
tombstones := iter.r.TombstoneRange(key)
|
||||
|
||||
var blk *block
|
||||
if cap(k.buf[i]) > len(k.buf[i]) {
|
||||
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
|
||||
blk = k.buf[i][len(k.buf[i])-1]
|
||||
if blk == nil {
|
||||
blk = &block{}
|
||||
k.buf[i][len(k.buf[i])-1] = blk
|
||||
}
|
||||
} else {
|
||||
blk = &block{}
|
||||
k.buf[i] = append(k.buf[i], blk)
|
||||
}
|
||||
|
||||
blk.minTime = minTime
|
||||
blk.maxTime = maxTime
|
||||
blk.key = key
|
||||
blk.typ = typ
|
||||
blk.b = b
|
||||
blk.tombstones = tombstones
|
||||
blk.readMin = math.MaxInt64
|
||||
blk.readMax = math.MinInt64
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if iter.Err() != nil {
|
||||
k.err = iter.Err()
|
||||
}
|
||||
if iter.Err() != nil {
|
||||
k.err = iter.Err()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue