Revert "Fix memory spike when compacting overwritten points"

This reverts commit d99c5e26f6.
pull/6637/head
Jason Wilder 2016-05-16 09:30:34 -06:00
parent 6c3e76e0ce
commit 23fc9ff748
2 changed files with 76 additions and 181 deletions

View File

@ -640,13 +640,6 @@ type tsmKeyIterator struct {
blocks blocks
buf []blocks
// mergeValues are decoded blocks that have been combined
mergedValues Values
// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
}
type block struct {
@ -654,28 +647,6 @@ type block struct {
minTime, maxTime int64
b []byte
tombstones []TimeRange
// readMin, readMax are the timestamps range of values have been
// read and encoded from this block.
readMin, readMax int64
}
func (b *block) overlapsTimeRange(min, max int64) bool {
return b.minTime <= max && b.maxTime >= min
}
func (b *block) read() bool {
return b.readMin <= b.minTime && b.readMax >= b.maxTime
}
func (b *block) markRead(min, max int64) {
if min < b.readMin {
b.readMin = min
}
if max > b.readMax {
b.readMax = max
}
}
type blocks []*block
@ -709,26 +680,11 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
}
func (k *tsmKeyIterator) Next() bool {
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
}
}
// Any merged values pending?
if len(k.mergedValues) > 0 {
k.merge()
if len(k.merged) > 0 {
return true
}
}
// If we still have blocks from the last read, merge them
// If we still have blocks from the last read, slice off the current one
// and return
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 {
k.blocks = k.blocks[1:]
if len(k.blocks) > 0 {
return true
}
}
@ -746,14 +702,14 @@ func (k *tsmKeyIterator) Next() bool {
// This block may have ranges of time removed from it that would
// reduce the block min and max time.
tombstones := iter.r.TombstoneRange(key)
minTime, maxTime = k.clampTombstoneRange(tombstones, minTime, maxTime)
k.buf[i] = append(k.buf[i], &block{
minTime: minTime,
maxTime: maxTime,
key: key,
b: b,
tombstones: tombstones,
readMin: math.MaxInt64,
readMax: math.MinInt64,
})
blockKey := key
@ -765,6 +721,7 @@ func (k *tsmKeyIterator) Next() bool {
}
tombstones := iter.r.TombstoneRange(key)
minTime, maxTime = k.clampTombstoneRange(tombstones, minTime, maxTime)
k.buf[i] = append(k.buf[i], &block{
minTime: minTime,
@ -772,8 +729,6 @@ func (k *tsmKeyIterator) Next() bool {
key: key,
b: b,
tombstones: tombstones,
readMin: math.MaxInt64,
readMax: math.MinInt64,
})
}
}
@ -792,7 +747,6 @@ func (k *tsmKeyIterator) Next() bool {
minKey = b[0].key
}
}
k.key = minKey
// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
@ -800,26 +754,20 @@ func (k *tsmKeyIterator) Next() bool {
if len(b) == 0 {
continue
}
if b[0].key == k.key {
if b[0].key == minKey {
k.blocks = append(k.blocks, b...)
k.buf[i] = nil
}
}
// No blocks left, we're done
if len(k.blocks) == 0 {
return false
}
k.merge()
return len(k.merged) > 0
}
// merge combines the next set of blocks into merged blocks
func (k *tsmKeyIterator) merge() {
// No blocks left, we're done
if len(k.blocks) == 0 {
return
// Only one block and no tombstoned values, just return early everything after is wasted work
if len(k.blocks) == 1 && len(k.blocks[0].tombstones) == 0 {
return true
}
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
@ -831,10 +779,6 @@ func (k *tsmKeyIterator) merge() {
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].read() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
@ -842,69 +786,42 @@ func (k *tsmKeyIterator) merge() {
}
}
k.merged = k.combine(dedup)
k.blocks = k.combine(dedup)
return len(k.blocks) > 0
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combine(dedup bool) blocks {
var decoded Values
if dedup {
for len(k.mergedValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}
if len(k.blocks) == 0 {
break
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}
first := k.blocks[0]
decoded = append(decoded, v...)
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(first.minTime, first.maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = Values(v).Include(first.minTime, first.maxTime)
if len(v) > 0 {
// Recoder that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}
k.mergedValues = k.mergedValues.Merge(v)
}
k.blocks = k.blocks[1:]
}
decoded = decoded.Deduplicate()
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk(nil)
return k.chunk(nil, decoded)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
@ -916,12 +833,6 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
}
@ -929,48 +840,47 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
chunked = append(chunked, k.blocks[i])
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
for i < len(k.blocks) {
v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedValues = k.mergedValues.Merge(v)
decoded = append(decoded, v...)
i++
}
k.blocks = k.blocks[i:]
return k.chunk(chunked)
sort.Sort(Values(decoded))
return k.chunk(chunked, decoded)
}
}
func (k *tsmKeyIterator) chunk(dst blocks) blocks {
k.mergedValues.assertOrdered()
func (k *tsmKeyIterator) chunk(dst blocks, values []Value) blocks {
for len(values) > k.size {
cb, err := Values(values[:k.size]).Encode(nil)
if err != nil {
k.err = err
return nil
}
for len(k.mergedValues) > k.size {
values := k.mergedValues[:k.size]
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[k.size-1].UnixNano(),
key: k.blocks[0].key,
b: cb,
})
values = values[k.size:]
}
// Re-encode the remaining values into the last block
if len(values) > 0 {
cb, err := Values(values).Encode(nil)
if err != nil {
k.err = err
@ -980,38 +890,31 @@ func (k *tsmKeyIterator) chunk(dst blocks) blocks {
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
key: k.blocks[0].key,
b: cb,
})
k.mergedValues = k.mergedValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedValues) > 0 {
cb, err := Values(k.mergedValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedValues[0].UnixNano(),
maxTime: k.mergedValues[len(k.mergedValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedValues = k.mergedValues[:0]
}
return dst
}
func (k *tsmKeyIterator) clampTombstoneRange(tombstones []TimeRange, minTime, maxTime int64) (int64, int64) {
for _, t := range tombstones {
if t.Min > minTime {
minTime = t.Min
}
if t.Max < maxTime {
maxTime = t.Max
}
}
return minTime, maxTime
}
func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) {
if len(k.merged) == 0 {
if len(k.blocks) == 0 {
return "", 0, 0, nil, k.err
}
block := k.merged[0]
block := k.blocks[0]
return block.key, block.minTime, block.maxTime, block.b, k.err
}

View File

@ -612,24 +612,19 @@ func TestTSMKeyIterator_Single(t *testing.T) {
// Tests that a single TSM file can be read and iterated over
func TestTSMKeyIterator_Chunked(t *testing.T) {
t.Skip("fixme")
dir := MustTempDir()
defer os.RemoveAll(dir)
v0 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v0},
}
r1 := MustTSMReader(dir, 1, writes)
v1 := tsm1.NewValue(2, 2.1)
writes1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v0, v1},
}
r2 := MustTSMReader(dir, 2, writes1)
r := MustTSMReader(dir, 1, writes)
iter, err := tsm1.NewTSMKeyIterator(2, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -651,20 +646,17 @@ func TestTSMKeyIterator_Chunked(t *testing.T) {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 2; got != exp {
if got, exp := len(values), len(writes); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = len(values) > 0
assertValueEqual(t, values[0], v0)
assertValueEqual(t, values[1], v1)
for _, v := range values {
readValues = true
assertValueEqual(t, v, writes["cpu,host=A#!~#value"][chunk])
}
chunk++
}
if got, exp := chunk, 1; got != exp {
t.Fatalf("chunk count mismatch: got %v, exp %v", got, exp)
}
if !readValues {
t.Fatalf("failed to read any values")
}