Use typed values to avoid allocations

This switches compactions to use type values (FloatValues) from the
generic Values type.  It avoids a bunch of allocations where each value
much be converted from a specific type to an interface{}.
pull/8193/head
Jason Wilder 2017-03-08 18:27:04 -07:00
parent a1c84ae6f3
commit ced953ae89
8 changed files with 1321 additions and 202 deletions

View File

@ -0,0 +1,867 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: compact.gen.go.tmpl
package tsm1
import (
"runtime"
)
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) mergeFloat() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedFloatValues) == 0 {
return
}
dedup := len(k.mergedFloatValues) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combineFloat(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combineFloat(dedup bool) blocks {
if dedup {
for len(k.mergedFloatValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeFloatBlock(k.blocks[i].b, &[]FloatValue{})
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = FloatValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = FloatValues(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = FloatValues(v).Exclude(ts.Min, ts.Max)
}
k.mergedFloatValues = k.mergedFloatValues.Merge(v)
// Allow other goroutines to run
runtime.Gosched()
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunkFloat(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
// Allow other goroutines to run
runtime.Gosched()
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
// Allow other goroutines to run
runtime.Gosched()
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedFloatValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := DecodeFloatBlock(k.blocks[i].b, &[]FloatValue{})
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = FloatValues(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedFloatValues = k.mergedFloatValues.Merge(v)
i++
// Allow other goroutines to run
runtime.Gosched()
}
k.blocks = k.blocks[i:]
return k.chunkFloat(chunked)
}
}
func (k *tsmKeyIterator) chunkFloat(dst blocks) blocks {
if len(k.mergedFloatValues) > k.size {
values := k.mergedFloatValues[:k.size]
cb, err := FloatValues(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedFloatValues = k.mergedFloatValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedFloatValues) > 0 {
cb, err := FloatValues(k.mergedFloatValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedFloatValues[0].UnixNano(),
maxTime: k.mergedFloatValues[len(k.mergedFloatValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedFloatValues = k.mergedFloatValues[:0]
}
return dst
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) mergeInteger() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedIntegerValues) == 0 {
return
}
dedup := len(k.mergedIntegerValues) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combineInteger(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combineInteger(dedup bool) blocks {
if dedup {
for len(k.mergedIntegerValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeIntegerBlock(k.blocks[i].b, &[]IntegerValue{})
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = IntegerValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = IntegerValues(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = IntegerValues(v).Exclude(ts.Min, ts.Max)
}
k.mergedIntegerValues = k.mergedIntegerValues.Merge(v)
// Allow other goroutines to run
runtime.Gosched()
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunkInteger(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
// Allow other goroutines to run
runtime.Gosched()
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
// Allow other goroutines to run
runtime.Gosched()
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedIntegerValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := DecodeIntegerBlock(k.blocks[i].b, &[]IntegerValue{})
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = IntegerValues(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedIntegerValues = k.mergedIntegerValues.Merge(v)
i++
// Allow other goroutines to run
runtime.Gosched()
}
k.blocks = k.blocks[i:]
return k.chunkInteger(chunked)
}
}
func (k *tsmKeyIterator) chunkInteger(dst blocks) blocks {
if len(k.mergedIntegerValues) > k.size {
values := k.mergedIntegerValues[:k.size]
cb, err := IntegerValues(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedIntegerValues = k.mergedIntegerValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedIntegerValues) > 0 {
cb, err := IntegerValues(k.mergedIntegerValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedIntegerValues[0].UnixNano(),
maxTime: k.mergedIntegerValues[len(k.mergedIntegerValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedIntegerValues = k.mergedIntegerValues[:0]
}
return dst
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) mergeString() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedStringValues) == 0 {
return
}
dedup := len(k.mergedStringValues) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combineString(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combineString(dedup bool) blocks {
if dedup {
for len(k.mergedStringValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeStringBlock(k.blocks[i].b, &[]StringValue{})
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = StringValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = StringValues(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = StringValues(v).Exclude(ts.Min, ts.Max)
}
k.mergedStringValues = k.mergedStringValues.Merge(v)
// Allow other goroutines to run
runtime.Gosched()
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunkString(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
// Allow other goroutines to run
runtime.Gosched()
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
// Allow other goroutines to run
runtime.Gosched()
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedStringValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := DecodeStringBlock(k.blocks[i].b, &[]StringValue{})
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = StringValues(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedStringValues = k.mergedStringValues.Merge(v)
i++
// Allow other goroutines to run
runtime.Gosched()
}
k.blocks = k.blocks[i:]
return k.chunkString(chunked)
}
}
func (k *tsmKeyIterator) chunkString(dst blocks) blocks {
if len(k.mergedStringValues) > k.size {
values := k.mergedStringValues[:k.size]
cb, err := StringValues(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedStringValues = k.mergedStringValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedStringValues) > 0 {
cb, err := StringValues(k.mergedStringValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedStringValues[0].UnixNano(),
maxTime: k.mergedStringValues[len(k.mergedStringValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedStringValues = k.mergedStringValues[:0]
}
return dst
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) mergeBoolean() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedBooleanValues) == 0 {
return
}
dedup := len(k.mergedBooleanValues) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combineBoolean(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks {
if dedup {
for len(k.mergedBooleanValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeBooleanBlock(k.blocks[i].b, &[]BooleanValue{})
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = BooleanValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = BooleanValues(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = BooleanValues(v).Exclude(ts.Min, ts.Max)
}
k.mergedBooleanValues = k.mergedBooleanValues.Merge(v)
// Allow other goroutines to run
runtime.Gosched()
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunkBoolean(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
// Allow other goroutines to run
runtime.Gosched()
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
// Allow other goroutines to run
runtime.Gosched()
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedBooleanValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := DecodeBooleanBlock(k.blocks[i].b, &[]BooleanValue{})
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = BooleanValues(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedBooleanValues = k.mergedBooleanValues.Merge(v)
i++
// Allow other goroutines to run
runtime.Gosched()
}
k.blocks = k.blocks[i:]
return k.chunkBoolean(chunked)
}
}
func (k *tsmKeyIterator) chunkBoolean(dst blocks) blocks {
if len(k.mergedBooleanValues) > k.size {
values := k.mergedBooleanValues[:k.size]
cb, err := BooleanValues(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedBooleanValues = k.mergedBooleanValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedBooleanValues) > 0 {
cb, err := BooleanValues(k.mergedBooleanValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedBooleanValues[0].UnixNano(),
maxTime: k.mergedBooleanValues[len(k.mergedBooleanValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedBooleanValues = k.mergedBooleanValues[:0]
}
return dst
}

View File

@ -0,0 +1,223 @@
package tsm1
import (
"runtime"
)
{{range .}}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) merge{{.Name}}() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.merged{{.Name}}Values) == 0 {
return
}
dedup := len(k.merged{{.Name}}Values) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combine{{.Name}}(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
if dedup {
for len(k.merged{{.Name}}Values) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{})
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = {{.Name}}Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = {{.Name}}Values(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max)
}
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
// Allow other goroutines to run
runtime.Gosched()
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk{{.Name}}(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
// Allow other goroutines to run
runtime.Gosched()
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
// Allow other goroutines to run
runtime.Gosched()
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.merged{{.Name}}Values) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{})
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
i++
// Allow other goroutines to run
runtime.Gosched()
}
k.blocks = k.blocks[i:]
return k.chunk{{.Name}}(chunked)
}
}
func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks {
if len(k.merged{{.Name}}Values) > k.size {
values := k.merged{{.Name}}Values[:k.size]
cb, err := {{.Name}}Values(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values = k.merged{{.Name}}Values[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.merged{{.Name}}Values) > 0 {
cb, err := {{.Name}}Values(k.merged{{.Name}}Values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.merged{{.Name}}Values[0].UnixNano(),
maxTime: k.merged{{.Name}}Values[len(k.merged{{.Name}}Values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values = k.merged{{.Name}}Values[:0]
}
return dst
}
{{ end }}

View File

@ -0,0 +1,18 @@
[
{
"Name":"Float",
"name":"float"
},
{
"Name":"Integer",
"name":"integer"
},
{
"Name":"String",
"name":"string"
},
{
"Name":"Boolean",
"name":"boolean"
}
]

View File

@ -887,6 +887,7 @@ type tsmKeyIterator struct {
// key is the current key lowest key across all readers that has not be fully exhausted
// of values.
key string
typ byte
iterators []*BlockIterator
blocks blocks
@ -894,7 +895,10 @@ type tsmKeyIterator struct {
buf []blocks
// mergeValues are decoded blocks that have been combined
mergedValues Values
mergedFloatValues FloatValues
mergedIntegerValues IntegerValues
mergedBooleanValues BooleanValues
mergedStringValues StringValues
// merged are encoded blocks that have been combined or used as is
// without decode
@ -904,6 +908,7 @@ type tsmKeyIterator struct {
type block struct {
key string
minTime, maxTime int64
typ byte
b []byte
tombstones []TimeRange
@ -966,6 +971,13 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
}, nil
}
func (k *tsmKeyIterator) hasMergedValues() bool {
return len(k.mergedFloatValues) > 0 ||
len(k.mergedIntegerValues) > 0 ||
len(k.mergedStringValues) > 0 ||
len(k.mergedBooleanValues) > 0
}
// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
// Any merged blocks pending?
@ -977,9 +989,9 @@ func (k *tsmKeyIterator) Next() bool {
}
// Any merged values pending?
if len(k.mergedValues) > 0 {
if k.hasMergedValues() {
k.merge()
if len(k.merged) > 0 || len(k.mergedValues) > 0 {
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
@ -987,7 +999,7 @@ func (k *tsmKeyIterator) Next() bool {
// If we still have blocks from the last read, merge them
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 || len(k.mergedValues) > 0 {
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
@ -997,7 +1009,7 @@ func (k *tsmKeyIterator) Next() bool {
if v == nil {
iter := k.iterators[i]
if iter.Next() {
key, minTime, maxTime, _, _, b, err := iter.Read()
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
@ -1009,6 +1021,7 @@ func (k *tsmKeyIterator) Next() bool {
minTime: minTime,
maxTime: maxTime,
key: key,
typ: typ,
b: b,
tombstones: tombstones,
readMin: math.MaxInt64,
@ -1018,7 +1031,7 @@ func (k *tsmKeyIterator) Next() bool {
blockKey := key
for iter.PeekNext() == blockKey {
iter.Next()
key, minTime, maxTime, _, _, b, err := iter.Read()
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
@ -1029,6 +1042,7 @@ func (k *tsmKeyIterator) Next() bool {
minTime: minTime,
maxTime: maxTime,
key: key,
typ: typ,
b: b,
tombstones: tombstones,
readMin: math.MaxInt64,
@ -1042,6 +1056,7 @@ func (k *tsmKeyIterator) Next() bool {
// Each reader could have a different key that it's currently at, need to find
// the next smallest one to keep the sort ordering.
var minKey string
var minType byte
for _, b := range k.buf {
// block could be nil if the iterator has been exhausted for that file
if len(b) == 0 {
@ -1049,9 +1064,11 @@ func (k *tsmKeyIterator) Next() bool {
}
if minKey == "" || b[0].key < minKey {
minKey = b[0].key
minType = b[0].typ
}
}
k.key = minKey
k.typ = minType
// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
@ -1076,203 +1093,18 @@ func (k *tsmKeyIterator) Next() bool {
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) merge() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedValues) == 0 {
return
switch k.typ {
case BlockFloat64:
k.mergeFloat()
case BlockInteger:
k.mergeInteger()
case BlockBoolean:
k.mergeBoolean()
case BlockString:
k.mergeString()
default:
k.err = fmt.Errorf("unknown block type: %v", k.typ)
}
dedup := len(k.mergedValues) > 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combine(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combine(dedup bool) blocks {
if dedup {
for len(k.mergedValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = Values(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}
k.mergedValues = k.mergedValues.Merge(v)
}
k.blocks = k.blocks[1:]
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedValues = k.mergedValues.Merge(v)
i++
}
k.blocks = k.blocks[i:]
return k.chunk(chunked)
}
}
func (k *tsmKeyIterator) chunk(dst blocks) blocks {
if len(k.mergedValues) > k.size {
values := k.mergedValues[:k.size]
cb, err := Values(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedValues = k.mergedValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedValues) > 0 {
cb, err := Values(k.mergedValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedValues[0].UnixNano(),
maxTime: k.mergedValues[len(k.mergedValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedValues = k.mergedValues[:0]
}
return dst
}
func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) {

View File

@ -370,6 +370,10 @@ func (a FloatValues) Merge(b FloatValues) FloatValues {
return a
}
func (a FloatValues) Encode(buf []byte) ([]byte, error) {
return encodeFloatValuesBlock(buf, a)
}
// Sort methods
func (a FloatValues) Len() int { return len(a) }
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -552,6 +556,10 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
return a
}
func (a IntegerValues) Encode(buf []byte) ([]byte, error) {
return encodeIntegerValuesBlock(buf, a)
}
// Sort methods
func (a IntegerValues) Len() int { return len(a) }
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -734,6 +742,10 @@ func (a StringValues) Merge(b StringValues) StringValues {
return a
}
func (a StringValues) Encode(buf []byte) ([]byte, error) {
return encodeStringValuesBlock(buf, a)
}
// Sort methods
func (a StringValues) Len() int { return len(a) }
func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -916,6 +928,10 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
return a
}
func (a BooleanValues) Encode(buf []byte) ([]byte, error) {
return encodeBooleanValuesBlock(buf, a)
}
// Sort methods
func (a BooleanValues) Len() int { return len(a) }
func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -185,6 +185,12 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values {
return a
}
{{ if ne .Name "" }}
func (a {{.Name}}Values) Encode(buf []byte) ([]byte, error) {
return encode{{.Name}}ValuesBlock(buf, a)
}
{{ end }}
// Sort methods
func (a {{.Name}}Values) Len() int { return len(a) }
func (a {{.Name}}Values) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -366,6 +366,53 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeFloatValuesBlock(buf []byte, values []FloatValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
// A float block is encoded using different compression strategies
// for timestamps and values.
// Encode values using Gorilla float compression
venc := getFloatEncoder()
// Encode timestamps using an adaptive encoder that uses delta-encoding,
// frame-or-reference and run length encoding.
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Push(v.value)
}
venc.Finish()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockFloat64, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return b, err
}
// DecodeFloatBlock decodes the float block from the byte slice
// and appends the float values to a.
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
@ -496,6 +543,48 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeBooleanValuesBlock(buf []byte, values []BooleanValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
// A boolean block is encoded using different compression strategies
// for timestamps and values.
venc := getBooleanEncoder(len(values))
// Encode timestamps using an adaptive encoder
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockBoolean, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putBooleanEncoder(venc)
return b, err
}
// DecodeBooleanBlock decodes the boolean block from the byte slice
// and appends the boolean values to a.
func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) {
@ -613,6 +702,39 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeIntegerValuesBlock(buf []byte, values []IntegerValue) ([]byte, error) {
tsEnc := getTimeEncoder(len(values))
vEnc := getIntegerEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsEnc.Write(v.unixnano)
vEnc.Write(v.value)
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return err
}
// Encoded int64 values
vb, err := vEnc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes
b = packBlock(buf, BlockInteger, tb, vb)
return nil
}()
putTimeEncoder(tsEnc)
putIntegerEncoder(vEnc)
return b, err
}
// DecodeIntegerBlock decodes the integer block from the byte slice
// and appends the integer values to a.
func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) {
@ -732,6 +854,40 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeStringValuesBlock(buf []byte, values []StringValue) ([]byte, error) {
tsEnc := getTimeEncoder(len(values))
vEnc := getStringEncoder(len(values) * len(values[0].value))
var b []byte
err := func() error {
for _, v := range values {
tsEnc.Write(v.unixnano)
vEnc.Write(v.value)
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return err
}
// Encoded string values
vb, err := vEnc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes
b = packBlock(buf, BlockString, tb, vb)
return nil
}()
putTimeEncoder(tsEnc)
putStringEncoder(vEnc)
return b, err
}
// DecodeStringBlock decodes the string block from the byte slice
// and appends the string values to a.
func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) {

View File

@ -26,6 +26,7 @@ import (
//go:generate tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl
//go:generate tmpl -data=@file_store.gen.go.tmpldata file_store.gen.go.tmpl
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
func init() {
tsdb.RegisterEngine("tsm1", NewEngine)