Fix race condition in the merge iterator close method
If the close happens when next is being called, it can result in a race condition where the current iterator gets set to nil after the initial check. This also fixes the finalizer so it runs the close method in a goroutine instead of running it by itself. This is because all finalizers run on the same goroutine so a close that takes a long time can cause a backup for all finalizers. This also removes the redundant call to `runtime.SetFinalizer` from the finalizer itself because a finalizer, when called, has already cleared itself.pull/9163/head
parent
f23dc1515e
commit
a73c3a1965
|
@ -15,6 +15,7 @@
|
|||
- [#9065](https://github.com/influxdata/influxdb/pull/9065): Refuse extra arguments to influx CLI
|
||||
- [#9058](https://github.com/influxdata/influxdb/issues/9058): Fix space required after regex operator. Thanks @stop-start!
|
||||
- [#9109](https://github.com/influxdata/influxdb/issues/9109): Fix: panic: sync: WaitGroup is reused before previous Wait has returned
|
||||
- [#9163](https://github.com/influxdata/influxdb/pull/9163): Fix race condition in the merge iterator close method.
|
||||
|
||||
## v1.4.2 [2017-11-15]
|
||||
|
||||
|
|
|
@ -112,6 +112,9 @@ type floatMergeIterator struct {
|
|||
heap *floatMergeHeap
|
||||
init bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
|
||||
// Current iterator and window.
|
||||
curr *floatMergeHeapItem
|
||||
window struct {
|
||||
|
@ -140,6 +143,7 @@ func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMe
|
|||
// Append to the heap.
|
||||
itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput})
|
||||
}
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
|
@ -154,17 +158,27 @@ func (itr *floatMergeIterator) Stats() IteratorStats {
|
|||
|
||||
// Close closes the underlying iterators.
|
||||
func (itr *floatMergeIterator) Close() error {
|
||||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *floatMergeIterator) Next() (*FloatPoint, error) {
|
||||
itr.mu.RLock()
|
||||
defer itr.mu.RUnlock()
|
||||
if itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
||||
// so that iterator initialization done through the Select() call returns quickly.
|
||||
// Queries can only be interrupted after the Select() call completes so any operations
|
||||
|
@ -3514,6 +3528,9 @@ type integerMergeIterator struct {
|
|||
heap *integerMergeHeap
|
||||
init bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
|
||||
// Current iterator and window.
|
||||
curr *integerMergeHeapItem
|
||||
window struct {
|
||||
|
@ -3557,17 +3574,27 @@ func (itr *integerMergeIterator) Stats() IteratorStats {
|
|||
|
||||
// Close closes the underlying iterators.
|
||||
func (itr *integerMergeIterator) Close() error {
|
||||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *integerMergeIterator) Next() (*IntegerPoint, error) {
|
||||
itr.mu.RLock()
|
||||
defer itr.mu.RUnlock()
|
||||
if itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
||||
// so that iterator initialization done through the Select() call returns quickly.
|
||||
// Queries can only be interrupted after the Select() call completes so any operations
|
||||
|
@ -6914,6 +6941,9 @@ type unsignedMergeIterator struct {
|
|||
heap *unsignedMergeHeap
|
||||
init bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
|
||||
// Current iterator and window.
|
||||
curr *unsignedMergeHeapItem
|
||||
window struct {
|
||||
|
@ -6957,17 +6987,27 @@ func (itr *unsignedMergeIterator) Stats() IteratorStats {
|
|||
|
||||
// Close closes the underlying iterators.
|
||||
func (itr *unsignedMergeIterator) Close() error {
|
||||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *unsignedMergeIterator) Next() (*UnsignedPoint, error) {
|
||||
itr.mu.RLock()
|
||||
defer itr.mu.RUnlock()
|
||||
if itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
||||
// so that iterator initialization done through the Select() call returns quickly.
|
||||
// Queries can only be interrupted after the Select() call completes so any operations
|
||||
|
@ -10314,6 +10354,9 @@ type stringMergeIterator struct {
|
|||
heap *stringMergeHeap
|
||||
init bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
|
||||
// Current iterator and window.
|
||||
curr *stringMergeHeapItem
|
||||
window struct {
|
||||
|
@ -10357,17 +10400,27 @@ func (itr *stringMergeIterator) Stats() IteratorStats {
|
|||
|
||||
// Close closes the underlying iterators.
|
||||
func (itr *stringMergeIterator) Close() error {
|
||||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *stringMergeIterator) Next() (*StringPoint, error) {
|
||||
itr.mu.RLock()
|
||||
defer itr.mu.RUnlock()
|
||||
if itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
||||
// so that iterator initialization done through the Select() call returns quickly.
|
||||
// Queries can only be interrupted after the Select() call completes so any operations
|
||||
|
@ -13700,6 +13753,9 @@ type booleanMergeIterator struct {
|
|||
heap *booleanMergeHeap
|
||||
init bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
|
||||
// Current iterator and window.
|
||||
curr *booleanMergeHeapItem
|
||||
window struct {
|
||||
|
@ -13743,17 +13799,27 @@ func (itr *booleanMergeIterator) Stats() IteratorStats {
|
|||
|
||||
// Close closes the underlying iterators.
|
||||
func (itr *booleanMergeIterator) Close() error {
|
||||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *booleanMergeIterator) Next() (*BooleanPoint, error) {
|
||||
itr.mu.RLock()
|
||||
defer itr.mu.RUnlock()
|
||||
if itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
||||
// so that iterator initialization done through the Select() call returns quickly.
|
||||
// Queries can only be interrupted after the Select() call completes so any operations
|
||||
|
|
|
@ -110,6 +110,9 @@ type {{$k.name}}MergeIterator struct {
|
|||
heap *{{$k.name}}MergeHeap
|
||||
init bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
|
||||
// Current iterator and window.
|
||||
curr *{{$k.name}}MergeHeapItem
|
||||
window struct {
|
||||
|
@ -153,17 +156,27 @@ func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats {
|
|||
|
||||
// Close closes the underlying iterators.
|
||||
func (itr *{{$k.name}}MergeIterator) Close() error {
|
||||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) {
|
||||
itr.mu.RLock()
|
||||
defer itr.mu.RUnlock()
|
||||
if itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
||||
// so that iterator initialization done through the Select() call returns quickly.
|
||||
// Queries can only be interrupted after the Select() call completes so any operations
|
||||
|
|
|
@ -135,9 +135,10 @@ func newFloatFinalizerIterator(inner query.FloatIterator, logger *zap.Logger) *f
|
|||
}
|
||||
|
||||
func (itr *floatFinalizerIterator) closeGC() {
|
||||
runtime.SetFinalizer(itr, nil)
|
||||
itr.logger.Error("FloatIterator finalized by GC")
|
||||
itr.Close()
|
||||
go func() {
|
||||
itr.logger.Error("FloatIterator finalized by GC")
|
||||
itr.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func (itr *floatFinalizerIterator) Close() error {
|
||||
|
@ -598,9 +599,10 @@ func newIntegerFinalizerIterator(inner query.IntegerIterator, logger *zap.Logger
|
|||
}
|
||||
|
||||
func (itr *integerFinalizerIterator) closeGC() {
|
||||
runtime.SetFinalizer(itr, nil)
|
||||
itr.logger.Error("IntegerIterator finalized by GC")
|
||||
itr.Close()
|
||||
go func() {
|
||||
itr.logger.Error("IntegerIterator finalized by GC")
|
||||
itr.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func (itr *integerFinalizerIterator) Close() error {
|
||||
|
@ -1061,9 +1063,10 @@ func newUnsignedFinalizerIterator(inner query.UnsignedIterator, logger *zap.Logg
|
|||
}
|
||||
|
||||
func (itr *unsignedFinalizerIterator) closeGC() {
|
||||
runtime.SetFinalizer(itr, nil)
|
||||
itr.logger.Error("UnsignedIterator finalized by GC")
|
||||
itr.Close()
|
||||
go func() {
|
||||
itr.logger.Error("UnsignedIterator finalized by GC")
|
||||
itr.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func (itr *unsignedFinalizerIterator) Close() error {
|
||||
|
@ -1524,9 +1527,10 @@ func newStringFinalizerIterator(inner query.StringIterator, logger *zap.Logger)
|
|||
}
|
||||
|
||||
func (itr *stringFinalizerIterator) closeGC() {
|
||||
runtime.SetFinalizer(itr, nil)
|
||||
itr.logger.Error("StringIterator finalized by GC")
|
||||
itr.Close()
|
||||
go func() {
|
||||
itr.logger.Error("StringIterator finalized by GC")
|
||||
itr.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func (itr *stringFinalizerIterator) Close() error {
|
||||
|
@ -1987,9 +1991,10 @@ func newBooleanFinalizerIterator(inner query.BooleanIterator, logger *zap.Logger
|
|||
}
|
||||
|
||||
func (itr *booleanFinalizerIterator) closeGC() {
|
||||
runtime.SetFinalizer(itr, nil)
|
||||
itr.logger.Error("BooleanIterator finalized by GC")
|
||||
itr.Close()
|
||||
go func() {
|
||||
itr.logger.Error("BooleanIterator finalized by GC")
|
||||
itr.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func (itr *booleanFinalizerIterator) Close() error {
|
||||
|
|
|
@ -131,9 +131,10 @@ func new{{.Name}}FinalizerIterator(inner query.{{.Name}}Iterator, logger *zap.Lo
|
|||
}
|
||||
|
||||
func (itr *{{.name}}FinalizerIterator) closeGC() {
|
||||
runtime.SetFinalizer(itr, nil)
|
||||
itr.logger.Error("{{.Name}}Iterator finalized by GC")
|
||||
itr.Close()
|
||||
go func() {
|
||||
itr.logger.Error("{{.Name}}Iterator finalized by GC")
|
||||
itr.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func (itr *{{.name}}FinalizerIterator) Close() error {
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
func BenchmarkIntegerIterator_Next(b *testing.B) {
|
||||
opt := query.IteratorOptions{
|
||||
Aux: []influxql.VarRef{{Val: "f1"}, {Val: "f1"}, {Val: "f1"}, {Val: "f1"}},
|
||||
}
|
||||
aux := []cursorAt{
|
||||
&literalValueCursor{value: "foo bar"},
|
||||
&literalValueCursor{value: int64(1e3)},
|
||||
&literalValueCursor{value: float64(1e3)},
|
||||
&literalValueCursor{value: true},
|
||||
}
|
||||
|
||||
cur := newIntegerIterator("m0", query.Tags{}, opt, &infiniteIntegerCursor{}, aux, nil, nil)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cur.Next()
|
||||
}
|
||||
}
|
||||
|
||||
type infiniteIntegerCursor struct{}
|
||||
|
||||
func (*infiniteIntegerCursor) close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*infiniteIntegerCursor) next() (t int64, v interface{}) {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
func (*infiniteIntegerCursor) nextInteger() (t int64, v int64) {
|
||||
return 0, 0
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"os"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
func BenchmarkIntegerIterator_Next(b *testing.B) {
|
||||
opt := query.IteratorOptions{
|
||||
Aux: []influxql.VarRef{{Val: "f1"}, {Val: "f1"}, {Val: "f1"}, {Val: "f1"}},
|
||||
}
|
||||
aux := []cursorAt{
|
||||
&literalValueCursor{value: "foo bar"},
|
||||
&literalValueCursor{value: int64(1e3)},
|
||||
&literalValueCursor{value: float64(1e3)},
|
||||
&literalValueCursor{value: true},
|
||||
}
|
||||
|
||||
cur := newIntegerIterator("m0", query.Tags{}, opt, &infiniteIntegerCursor{}, aux, nil, nil)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cur.Next()
|
||||
}
|
||||
}
|
||||
|
||||
type infiniteIntegerCursor struct{}
|
||||
|
||||
func (*infiniteIntegerCursor) close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*infiniteIntegerCursor) next() (t int64, v interface{}) {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
func (*infiniteIntegerCursor) nextInteger() (t int64, v int64) {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
type testFinalizerIterator struct {
|
||||
OnClose func()
|
||||
}
|
||||
|
||||
func (itr *testFinalizerIterator) Next() (*query.FloatPoint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (itr *testFinalizerIterator) Close() error {
|
||||
// Act as if this is a slow finalizer and ensure that it doesn't block
|
||||
// the finalizer background thread.
|
||||
itr.OnClose()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *testFinalizerIterator) Stats() query.IteratorStats {
|
||||
return query.IteratorStats{}
|
||||
}
|
||||
|
||||
func TestFinalizerIterator(t *testing.T) {
|
||||
var (
|
||||
step1 = make(chan struct{})
|
||||
step2 = make(chan struct{})
|
||||
step3 = make(chan struct{})
|
||||
)
|
||||
|
||||
l := logger.New(os.Stderr)
|
||||
done := make(chan struct{})
|
||||
func() {
|
||||
itr := &testFinalizerIterator{
|
||||
OnClose: func() {
|
||||
// Simulate a slow closing iterator by waiting for the done channel
|
||||
// to be closed. The done channel is closed by a later finalizer.
|
||||
close(step1)
|
||||
<-done
|
||||
close(step3)
|
||||
},
|
||||
}
|
||||
newFinalizerIterator(itr, l)
|
||||
}()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
runtime.GC()
|
||||
}
|
||||
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case <-timer.C:
|
||||
t.Fatal("The finalizer for the iterator did not run")
|
||||
close(done)
|
||||
case <-step1:
|
||||
// The finalizer has successfully run, but should not have completed yet.
|
||||
timer.Stop()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-step3:
|
||||
t.Fatal("The finalizer should not have finished yet")
|
||||
default:
|
||||
}
|
||||
|
||||
// Use a fake value that will be collected by the garbage collector and have
|
||||
// the finalizer close the channel. This finalizer should run after the iterator's
|
||||
// finalizer.
|
||||
value := func() int {
|
||||
foo := &struct {
|
||||
value int
|
||||
}{value: 1}
|
||||
runtime.SetFinalizer(foo, func(value interface{}) {
|
||||
close(done)
|
||||
close(step2)
|
||||
})
|
||||
return foo.value + 2
|
||||
}()
|
||||
if value < 2 {
|
||||
t.Log("This should never be output")
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
runtime.GC()
|
||||
}
|
||||
|
||||
timer.Reset(100 * time.Millisecond)
|
||||
select {
|
||||
case <-timer.C:
|
||||
t.Fatal("The second finalizer did not run")
|
||||
case <-step2:
|
||||
// The finalizer has successfully run and should have
|
||||
// closed the done channel.
|
||||
timer.Stop()
|
||||
}
|
||||
|
||||
// Wait for step3 to finish where the closed value should be set.
|
||||
timer.Reset(100 * time.Millisecond)
|
||||
select {
|
||||
case <-timer.C:
|
||||
t.Fatal("The iterator was not finalized")
|
||||
case <-step3:
|
||||
timer.Stop()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue