Ensure that errors from closing the iterators underneath a MergeIterator are returned up the stack. (cherry picked from commitpull/25060/head5fda409f39
) closes https://github.com/influxdata/influxdb/issues/24977 (cherry picked from commita97566bc31
) closes https://github.com/influxdata/influxdb/issues/24978
parent
bc42088131
commit
00b688888a
|
@ -10,6 +10,7 @@ package query
|
|||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -162,14 +163,15 @@ func (itr *floatMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -2825,14 +2827,15 @@ func (itr *integerMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -5488,14 +5491,15 @@ func (itr *unsignedMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -8151,14 +8155,15 @@ func (itr *stringMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -10800,14 +10805,15 @@ func (itr *booleanMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
|
|
@ -4,6 +4,7 @@ package query
|
|||
import (
|
||||
"context"
|
||||
"container/heap"
|
||||
"errors"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -159,14 +160,15 @@ func (itr *{{$k.name}}MergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
|
Loading…
Reference in New Issue