224 lines
5.8 KiB
Cheetah
224 lines
5.8 KiB
Cheetah
|
package tsm1
|
||
|
|
||
|
import (
|
||
|
"runtime"
|
||
|
)
|
||
|
|
||
|
{{range .}}
|
||
|
|
||
|
// merge combines the next set of blocks into merged blocks.
|
||
|
func (k *tsmKeyIterator) merge{{.Name}}() {
|
||
|
// No blocks left, or pending merged values, we're done
|
||
|
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.merged{{.Name}}Values) == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
dedup := len(k.merged{{.Name}}Values) != 0
|
||
|
if len(k.blocks) > 0 && !dedup {
|
||
|
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
|
||
|
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
|
||
|
|
||
|
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
|
||
|
// we need to dedup as there may be duplicate points now
|
||
|
for i := 1; !dedup && i < len(k.blocks); i++ {
|
||
|
if k.blocks[i].partiallyRead() {
|
||
|
dedup = true
|
||
|
break
|
||
|
}
|
||
|
|
||
|
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
|
||
|
dedup = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
k.merged = k.combine{{.Name}}(dedup)
|
||
|
}
|
||
|
|
||
|
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
|
||
|
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
|
||
|
// only blocks that are smaller than the chunk size will be decoded and combined.
|
||
|
func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
||
|
if dedup {
|
||
|
for len(k.merged{{.Name}}Values) < k.size && len(k.blocks) > 0 {
|
||
|
for len(k.blocks) > 0 && k.blocks[0].read() {
|
||
|
k.blocks = k.blocks[1:]
|
||
|
}
|
||
|
|
||
|
if len(k.blocks) == 0 {
|
||
|
break
|
||
|
}
|
||
|
first := k.blocks[0]
|
||
|
minTime := first.minTime
|
||
|
maxTime := first.maxTime
|
||
|
|
||
|
// Adjust the min time to the start of any overlapping blocks.
|
||
|
for i := 0; i < len(k.blocks); i++ {
|
||
|
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
|
||
|
if k.blocks[i].minTime < minTime {
|
||
|
minTime = k.blocks[i].minTime
|
||
|
}
|
||
|
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
|
||
|
maxTime = k.blocks[i].maxTime
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// We have some overlapping blocks so decode all, append in order and then dedup
|
||
|
for i := 0; i < len(k.blocks); i++ {
|
||
|
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{})
|
||
|
if err != nil {
|
||
|
k.err = err
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Remove values we already read
|
||
|
v = {{.Name}}Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
|
||
|
|
||
|
// Filter out only the values for overlapping block
|
||
|
v = {{.Name}}Values(v).Include(minTime, maxTime)
|
||
|
if len(v) > 0 {
|
||
|
// Record that we read a subset of the block
|
||
|
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
|
||
|
}
|
||
|
|
||
|
// Apply each tombstone to the block
|
||
|
for _, ts := range k.blocks[i].tombstones {
|
||
|
v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max)
|
||
|
}
|
||
|
|
||
|
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
|
||
|
|
||
|
// Allow other goroutines to run
|
||
|
runtime.Gosched()
|
||
|
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Since we combined multiple blocks, we could have more values than we should put into
|
||
|
// a single block. We need to chunk them up into groups and re-encode them.
|
||
|
return k.chunk{{.Name}}(nil)
|
||
|
} else {
|
||
|
var chunked blocks
|
||
|
var i int
|
||
|
|
||
|
for i < len(k.blocks) {
|
||
|
|
||
|
// skip this block if it's values were already read
|
||
|
if k.blocks[i].read() {
|
||
|
i++
|
||
|
continue
|
||
|
}
|
||
|
// If we this block is already full, just add it as is
|
||
|
if BlockCount(k.blocks[i].b) >= k.size {
|
||
|
chunked = append(chunked, k.blocks[i])
|
||
|
} else {
|
||
|
break
|
||
|
}
|
||
|
i++
|
||
|
// Allow other goroutines to run
|
||
|
runtime.Gosched()
|
||
|
}
|
||
|
|
||
|
if k.fast {
|
||
|
for i < len(k.blocks) {
|
||
|
// skip this block if it's values were already read
|
||
|
if k.blocks[i].read() {
|
||
|
i++
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
chunked = append(chunked, k.blocks[i])
|
||
|
i++
|
||
|
// Allow other goroutines to run
|
||
|
runtime.Gosched()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
|
||
|
if i == len(k.blocks)-1 {
|
||
|
if !k.blocks[i].read() {
|
||
|
chunked = append(chunked, k.blocks[i])
|
||
|
}
|
||
|
i++
|
||
|
}
|
||
|
|
||
|
// The remaining blocks can be combined and we know that they do not overlap and
|
||
|
// so we can just append each, sort and re-encode.
|
||
|
for i < len(k.blocks) && len(k.merged{{.Name}}Values) < k.size {
|
||
|
if k.blocks[i].read() {
|
||
|
i++
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{})
|
||
|
if err != nil {
|
||
|
k.err = err
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Apply each tombstone to the block
|
||
|
for _, ts := range k.blocks[i].tombstones {
|
||
|
v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max)
|
||
|
}
|
||
|
|
||
|
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
|
||
|
|
||
|
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
|
||
|
i++
|
||
|
// Allow other goroutines to run
|
||
|
runtime.Gosched()
|
||
|
}
|
||
|
|
||
|
k.blocks = k.blocks[i:]
|
||
|
|
||
|
return k.chunk{{.Name}}(chunked)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks {
|
||
|
if len(k.merged{{.Name}}Values) > k.size {
|
||
|
values := k.merged{{.Name}}Values[:k.size]
|
||
|
cb, err := {{.Name}}Values(values).Encode(nil)
|
||
|
if err != nil {
|
||
|
k.err = err
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
dst = append(dst, &block{
|
||
|
minTime: values[0].UnixNano(),
|
||
|
maxTime: values[len(values)-1].UnixNano(),
|
||
|
key: k.key,
|
||
|
b: cb,
|
||
|
})
|
||
|
k.merged{{.Name}}Values = k.merged{{.Name}}Values[k.size:]
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
// Re-encode the remaining values into the last block
|
||
|
if len(k.merged{{.Name}}Values) > 0 {
|
||
|
cb, err := {{.Name}}Values(k.merged{{.Name}}Values).Encode(nil)
|
||
|
if err != nil {
|
||
|
k.err = err
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
dst = append(dst, &block{
|
||
|
minTime: k.merged{{.Name}}Values[0].UnixNano(),
|
||
|
maxTime: k.merged{{.Name}}Values[len(k.merged{{.Name}}Values)-1].UnixNano(),
|
||
|
key: k.key,
|
||
|
b: cb,
|
||
|
})
|
||
|
k.merged{{.Name}}Values = k.merged{{.Name}}Values[:0]
|
||
|
}
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
{{ end }}
|