From eab012ef61ddb5b2e83cf1cd32fc78799c8da37d Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 3 Mar 2017 12:33:35 -0700 Subject: [PATCH] Fix points missing after compaction If blocks containing overlapping ranges of time where partially recombined, it was possible for the some points to get dropped during compactions. This occurred because the window of time of the points we need to merge did not account for the partial blocks created from a prior merge. Fixes #8084 --- CHANGELOG.md | 1 + tsdb/engine/tsm1/compact.go | 49 ++++++---- tsdb/engine/tsm1/compact_test.go | 149 +++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4819b10ead..52ba9ab478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - [#8044](https://github.com/influxdata/influxdb/issues/8044): Treat non-reserved measurement names with underscores as normal measurements. - [#8078](https://github.com/influxdata/influxdb/issues/8078): Map types correctly when selecting a field with multiple measurements where one of the measurements is empty. - [#8080](https://github.com/influxdata/influxdb/issues/8080): Point.UnmarshalBinary() bounds check +- [#8084](https://github.com/influxdata/influxdb/issues/8084): Points missing after compaction - [#8085](https://github.com/influxdata/influxdb/issues/8085): panic: interface conversion: tsm1.Value is tsm1.IntegerValue, not tsm1.FloatValue. - [#8095](https://github.com/influxdata/influxdb/pull/8095): Fix race in WALEntry.Encode and Values.Deduplicate diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b43ce866cd..8e5548fd01 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -930,6 +930,10 @@ func (b *block) markRead(min, max int64) { } } +func (b *block) partiallyRead() bool { + return b.readMin != b.minTime || b.readMax != b.maxTime +} + type blocks []*block func (a blocks) Len() int { return len(a) } @@ -1077,25 +1081,25 @@ func (k *tsmKeyIterator) merge() { return } - dedup := false - if len(k.blocks) > 0 { + dedup := len(k.mergedValues) > 0 + if len(k.blocks) > 0 && !dedup { // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 + dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - if len(k.blocks) > 1 { - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - if k.blocks[i].read() { - dedup = true - break - } - if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { - dedup = true - break - } + // Quickly scan each block to see if any overlap with the prior block, if they overlap then + // we need to dedup as there may be duplicate points now + for i := 1; !dedup && i < len(k.blocks); i++ { + if k.blocks[i].partiallyRead() { + dedup = true + break + } + + if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 { + dedup = true + break } } + } k.merged = k.combine(dedup) @@ -1115,10 +1119,21 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks { break } first := k.blocks[0] + minTime := first.minTime + maxTime := first.maxTime + + // Adjust the min time to the start of any overlapping blocks. + for i := 0; i < len(k.blocks); i++ { + if k.blocks[i].overlapsTimeRange(minTime, maxTime) { + if k.blocks[i].minTime < minTime { + minTime = k.blocks[i].minTime + } + } + } // We have some overlapping blocks so decode all, append in order and then dedup for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(first.minTime, first.maxTime) || k.blocks[i].read() { + if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { continue } @@ -1132,7 +1147,7 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks { v = Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) // Filter out only the values for overlapping block - v = Values(v).Include(first.minTime, first.maxTime) + v = Values(v).Include(minTime, maxTime) if len(v) > 0 { // Record that we read a subset of the block k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index d8d58a5e83..922b8c8a28 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -191,6 +191,155 @@ func TestCompactor_CompactFull(t *testing.T) { } } +// Ensures that a compaction will properly merge multiple TSM files +func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + // write 3 TSM files with different data and one new point + a1 := tsm1.NewValue(4, 1.1) + a2 := tsm1.NewValue(5, 1.1) + a3 := tsm1.NewValue(7, 1.1) + + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a1, a2, a3}, + } + f1 := MustWriteTSM(dir, 1, writes) + + c1 := tsm1.NewValue(3, 1.2) + c2 := tsm1.NewValue(8, 1.2) + c3 := tsm1.NewValue(9, 1.2) + + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{c1, c2, c3}, + } + f3 := MustWriteTSM(dir, 3, writes) + + compactor := &tsm1.Compactor{ + Dir: dir, + FileStore: &fakeFileStore{}, + Size: 2, + } + + compactor.Open() + + files, err := compactor.CompactFast([]string{f1, f3}) + if err != nil { + t.Fatalf("unexpected error writing snapshot: %v", err) + } + + if got, exp := len(files), 1; got != exp { + t.Fatalf("files length mismatch: got %v, exp %v", got, exp) + } + + r := MustOpenTSMReader(files[0]) + + if got, exp := r.KeyCount(), 1; got != exp { + t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) + } + + var data = []struct { + key string + points []tsm1.Value + }{ + {"cpu,host=A#!~#value", []tsm1.Value{c1, a1, a2, a3, c2, c3}}, + } + + for _, p := range data { + values, err := r.ReadAll(p.key) + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := len(values), len(p.points); got != exp { + t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp) + } + + for i, point := range p.points { + assertValueEqual(t, values[i], point) + } + } +} + +// Ensures that a compaction will properly merge multiple TSM files +func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + // write 3 TSM files with different data and one new point + a1 := tsm1.NewValue(4, 1.1) + a2 := tsm1.NewValue(5, 1.1) + a3 := tsm1.NewValue(7, 1.1) + + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a1, a2, a3}, + } + f1 := MustWriteTSM(dir, 1, writes) + + b1 := tsm1.NewValue(1, 1.2) + b2 := tsm1.NewValue(2, 1.2) + b3 := tsm1.NewValue(6, 1.2) + + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{b1, b2, b3}, + } + f2 := MustWriteTSM(dir, 2, writes) + + c1 := tsm1.NewValue(3, 1.2) + c2 := tsm1.NewValue(8, 1.2) + c3 := tsm1.NewValue(9, 1.2) + + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{c1, c2, c3}, + } + f3 := MustWriteTSM(dir, 3, writes) + + compactor := &tsm1.Compactor{ + Dir: dir, + FileStore: &fakeFileStore{}, + Size: 2, + } + + compactor.Open() + + files, err := compactor.CompactFast([]string{f1, f2, f3}) + if err != nil { + t.Fatalf("unexpected error writing snapshot: %v", err) + } + + if got, exp := len(files), 1; got != exp { + t.Fatalf("files length mismatch: got %v, exp %v", got, exp) + } + + r := MustOpenTSMReader(files[0]) + + if got, exp := r.KeyCount(), 1; got != exp { + t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) + } + + var data = []struct { + key string + points []tsm1.Value + }{ + {"cpu,host=A#!~#value", []tsm1.Value{b1, b2, c1, a1, a2, b3, a3, c2, c3}}, + } + + for _, p := range data { + values, err := r.ReadAll(p.key) + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := len(values), len(p.points); got != exp { + t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp) + } + + for i, point := range p.points { + assertValueEqual(t, values[i], point) + } + } +} + // Ensures that a compaction will properly merge multiple TSM files func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { dir := MustTempDir()