Merge pull request #8094 from influxdata/jw-8084

Fix points missing after compaction
pull/8099/head^2
Jason Wilder 2017-03-06 10:27:20 -07:00 committed by GitHub
commit 58dd3dbefb
3 changed files with 182 additions and 17 deletions

View File

@ -19,6 +19,7 @@
- [#8044](https://github.com/influxdata/influxdb/issues/8044): Treat non-reserved measurement names with underscores as normal measurements.
- [#8078](https://github.com/influxdata/influxdb/issues/8078): Map types correctly when selecting a field with multiple measurements where one of the measurements is empty.
- [#8080](https://github.com/influxdata/influxdb/issues/8080): Point.UnmarshalBinary() bounds check
- [#8084](https://github.com/influxdata/influxdb/issues/8084): Points missing after compaction
- [#8085](https://github.com/influxdata/influxdb/issues/8085): panic: interface conversion: tsm1.Value is tsm1.IntegerValue, not tsm1.FloatValue.
- [#8095](https://github.com/influxdata/influxdb/pull/8095): Fix race in WALEntry.Encode and Values.Deduplicate

View File

@ -930,6 +930,10 @@ func (b *block) markRead(min, max int64) {
}
}
func (b *block) partiallyRead() bool {
return b.readMin != b.minTime || b.readMax != b.maxTime
}
type blocks []*block
func (a blocks) Len() int { return len(a) }
@ -1077,25 +1081,25 @@ func (k *tsmKeyIterator) merge() {
return
}
dedup := false
if len(k.blocks) > 0 {
dedup := len(k.mergedValues) > 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
if len(k.blocks) > 1 {
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].read() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combine(dedup)
@ -1115,10 +1119,21 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(first.minTime, first.maxTime) || k.blocks[i].read() {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
@ -1132,7 +1147,7 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
v = Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = Values(v).Include(first.minTime, first.maxTime)
v = Values(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())

View File

@ -191,6 +191,155 @@ func TestCompactor_CompactFull(t *testing.T) {
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(4, 1.1)
a2 := tsm1.NewValue(5, 1.1)
a3 := tsm1.NewValue(7, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a1, a2, a3},
}
f1 := MustWriteTSM(dir, 1, writes)
c1 := tsm1.NewValue(3, 1.2)
c2 := tsm1.NewValue(8, 1.2)
c3 := tsm1.NewValue(9, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{c1, c2, c3},
}
f3 := MustWriteTSM(dir, 3, writes)
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
Size: 2,
}
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f3})
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{c1, a1, a2, a3, c2, c3}},
}
for _, p := range data {
values, err := r.ReadAll(p.key)
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// write 3 TSM files with different data and one new point
a1 := tsm1.NewValue(4, 1.1)
a2 := tsm1.NewValue(5, 1.1)
a3 := tsm1.NewValue(7, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a1, a2, a3},
}
f1 := MustWriteTSM(dir, 1, writes)
b1 := tsm1.NewValue(1, 1.2)
b2 := tsm1.NewValue(2, 1.2)
b3 := tsm1.NewValue(6, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{b1, b2, b3},
}
f2 := MustWriteTSM(dir, 2, writes)
c1 := tsm1.NewValue(3, 1.2)
c2 := tsm1.NewValue(8, 1.2)
c3 := tsm1.NewValue(9, 1.2)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{c1, c2, c3},
}
f3 := MustWriteTSM(dir, 3, writes)
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
Size: 2,
}
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3})
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
r := MustOpenTSMReader(files[0])
if got, exp := r.KeyCount(), 1; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{b1, b2, c1, a1, a2, b3, a3, c2, c3}},
}
for _, p := range data {
values, err := r.ReadAll(p.key)
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch %s: got %v, exp %v", p.key, got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
dir := MustTempDir()