Merge pull request #9267 from hpbieker/hpb-compacting-sorting
Sort blocks by time when compactingpull/9279/head
commit
bf66f20388
|
@ -30,6 +30,7 @@
|
|||
- [#9226](https://github.com/influxdata/influxdb/issues/9226): Allow lone boolean literals in a condition expression.
|
||||
- [#9235](https://github.com/influxdata/influxdb/pull/9235): Improve performance when writes exceed `max-values-per-tag` or `max-series`.
|
||||
- [#9216](https://github.com/influxdata/influxdb/issues/9216): Prevent a panic when a query simultaneously finishes and is killed at the same time.
|
||||
- [#9255](https://github.com/influxdata/influxdb/issues/9255): Fix missing sorting of blocks by time when compacting.
|
||||
|
||||
## v1.4.3 [unreleased]
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ func (k *tsmKeyIterator) mergeFloat() {
|
|||
// we need to dedup as there may be duplicate points now
|
||||
for i := 1; !dedup && i < len(k.blocks); i++ {
|
||||
dedup = k.blocks[i].partiallyRead() ||
|
||||
k.blocks[i].minTime <= k.blocks[i-1].maxTime ||
|
||||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
|
||||
len(k.blocks[i].tombstones) > 0
|
||||
}
|
||||
|
||||
|
@ -219,7 +219,7 @@ func (k *tsmKeyIterator) mergeInteger() {
|
|||
// we need to dedup as there may be duplicate points now
|
||||
for i := 1; !dedup && i < len(k.blocks); i++ {
|
||||
dedup = k.blocks[i].partiallyRead() ||
|
||||
k.blocks[i].minTime <= k.blocks[i-1].maxTime ||
|
||||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
|
||||
len(k.blocks[i].tombstones) > 0
|
||||
}
|
||||
|
||||
|
@ -613,7 +613,7 @@ func (k *tsmKeyIterator) mergeString() {
|
|||
// we need to dedup as there may be duplicate points now
|
||||
for i := 1; !dedup && i < len(k.blocks); i++ {
|
||||
dedup = k.blocks[i].partiallyRead() ||
|
||||
k.blocks[i].minTime <= k.blocks[i-1].maxTime ||
|
||||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
|
||||
len(k.blocks[i].tombstones) > 0
|
||||
}
|
||||
|
||||
|
@ -810,7 +810,7 @@ func (k *tsmKeyIterator) mergeBoolean() {
|
|||
// we need to dedup as there may be duplicate points now
|
||||
for i := 1; !dedup && i < len(k.blocks); i++ {
|
||||
dedup = k.blocks[i].partiallyRead() ||
|
||||
k.blocks[i].minTime <= k.blocks[i-1].maxTime ||
|
||||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
|
||||
len(k.blocks[i].tombstones) > 0
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
{{range .}}
|
||||
|
||||
// merge combines the next set of blocks into merged blocks.
|
||||
|
@ -9,6 +13,8 @@ func (k *tsmKeyIterator) merge{{.Name}}() {
|
|||
return
|
||||
}
|
||||
|
||||
sort.Stable(k.blocks)
|
||||
|
||||
dedup := len(k.merged{{.Name}}Values) != 0
|
||||
if len(k.blocks) > 0 && !dedup {
|
||||
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
|
||||
|
|
|
@ -1270,7 +1270,7 @@ func (a blocks) Len() int { return len(a) }
|
|||
func (a blocks) Less(i, j int) bool {
|
||||
cmp := bytes.Compare(a[i].key, a[j].key)
|
||||
if cmp == 0 {
|
||||
return a[i].minTime < a[j].minTime
|
||||
return a[i].minTime < a[j].minTime && a[i].maxTime < a[j].minTime
|
||||
}
|
||||
return cmp < 0
|
||||
}
|
||||
|
|
|
@ -347,6 +347,75 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCompactor_Compact_UnsortedBlocks(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// write 2 TSM files with different data and one new point
|
||||
a1 := tsm1.NewValue(4, 1.1)
|
||||
a2 := tsm1.NewValue(5, 1.1)
|
||||
a3 := tsm1.NewValue(6, 1.1)
|
||||
|
||||
writes := map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": []tsm1.Value{a1, a2, a3},
|
||||
}
|
||||
f1 := MustWriteTSM(dir, 1, writes)
|
||||
|
||||
b1 := tsm1.NewValue(1, 1.2)
|
||||
b2 := tsm1.NewValue(2, 1.2)
|
||||
b3 := tsm1.NewValue(3, 1.2)
|
||||
|
||||
writes = map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": []tsm1.Value{b1, b2, b3},
|
||||
}
|
||||
f2 := MustWriteTSM(dir, 2, writes)
|
||||
|
||||
compactor := &tsm1.Compactor{
|
||||
Dir: dir,
|
||||
FileStore: &fakeFileStore{},
|
||||
Size: 2,
|
||||
}
|
||||
|
||||
compactor.Open()
|
||||
|
||||
files, err := compactor.CompactFast([]string{f1, f2})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error writing snapshot: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(files), 1; got != exp {
|
||||
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
r := MustOpenTSMReader(files[0])
|
||||
|
||||
if got, exp := r.KeyCount(), 1; got != exp {
|
||||
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
var data = []struct {
|
||||
key string
|
||||
points []tsm1.Value
|
||||
}{
|
||||
{"cpu,host=A#!~#value", []tsm1.Value{b1, b2, b3, a1, a2, a3}},
|
||||
}
|
||||
|
||||
for _, p := range data {
|
||||
values, err := r.ReadAll([]byte(p.key))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(values), len(p.points); got != exp {
|
||||
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
|
||||
}
|
||||
|
||||
for i, point := range p.points {
|
||||
assertValueEqual(t, values[i], point)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensures that a compaction will properly merge multiple TSM files
|
||||
func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
|
|
Loading…
Reference in New Issue