Ensure that errors from closing the
iterators underneath a MergeIterator
are returned up the stack.
(cherry picked from commit 5fda409f39
)
closes https://github.com/influxdata/influxdb/issues/24977
flux-staging/v0.195.1
parent
0a4d41bc90
commit
a97566bc31
|
@ -10,6 +10,7 @@ package query
|
|||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -162,14 +163,15 @@ func (itr *floatMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -2825,14 +2827,15 @@ func (itr *integerMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -5488,14 +5491,15 @@ func (itr *unsignedMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -8151,14 +8155,15 @@ func (itr *stringMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
@ -10800,14 +10805,15 @@ func (itr *booleanMergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
|
|
@ -4,6 +4,7 @@ package query
|
|||
import (
|
||||
"context"
|
||||
"container/heap"
|
||||
"errors"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -159,14 +160,15 @@ func (itr *{{$k.name}}MergeIterator) Close() error {
|
|||
itr.mu.Lock()
|
||||
defer itr.mu.Unlock()
|
||||
|
||||
var errs []error
|
||||
for _, input := range itr.inputs {
|
||||
input.Close()
|
||||
errs = append(errs, input.Close())
|
||||
}
|
||||
itr.curr = nil
|
||||
itr.inputs = nil
|
||||
itr.heap.items = nil
|
||||
itr.closed = true
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Next returns the next point from the iterator.
|
||||
|
|
Loading…
Reference in New Issue