fix: Allow compactor to make progress if v.MaxTime() != entry.MaxTime

pull/10616/head
Stuart Carnie 2018-11-13 20:32:14 -07:00 committed by Edd Robinson
parent bef0577206
commit 305ebb8729
2 changed files with 78 additions and 0 deletions

View File

@ -1076,6 +1076,14 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
if maxTime == k.blocks[i].maxTime {
maxTime = v.MaxTime()
}
k.blocks[i].maxTime = v.MaxTime()
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
@ -1152,6 +1160,11 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
k.blocks[i].maxTime = v.MaxTime()
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
@ -1282,6 +1295,14 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
if maxTime == k.blocks[i].maxTime {
maxTime = v.MaxTime()
}
k.blocks[i].maxTime = v.MaxTime()
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
@ -1358,6 +1379,11 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
k.blocks[i].maxTime = v.MaxTime()
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
@ -1488,6 +1514,14 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
if maxTime == k.blocks[i].maxTime {
maxTime = v.MaxTime()
}
k.blocks[i].maxTime = v.MaxTime()
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
@ -1564,6 +1598,11 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
k.blocks[i].maxTime = v.MaxTime()
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
@ -1694,6 +1733,14 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
if maxTime == k.blocks[i].maxTime {
maxTime = v.MaxTime()
}
k.blocks[i].maxTime = v.MaxTime()
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
@ -1770,6 +1817,11 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
k.blocks[i].maxTime = v.MaxTime()
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
@ -1900,6 +1952,14 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
if maxTime == k.blocks[i].maxTime {
maxTime = v.MaxTime()
}
k.blocks[i].maxTime = v.MaxTime()
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
@ -1976,6 +2036,11 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
k.blocks[i].maxTime = v.MaxTime()
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)

View File

@ -279,6 +279,14 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
if maxTime == k.blocks[i].maxTime {
maxTime = v.MaxTime()
}
k.blocks[i].maxTime = v.MaxTime()
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
@ -355,6 +363,11 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
return nil
}
// Invariant: v.MaxTime() == k.blocks[i].maxTime
if k.blocks[i].maxTime != v.MaxTime() {
k.blocks[i].maxTime = v.MaxTime()
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)