fix(tsdb): Defer closing of underlying SeriesIDSetIterators

This commit changes the SeriesIDSet merge/union/intersect functions
to attach the underlying iterators as closers so that files can be
retained until the data is no longer in use. The roaring operations
can leave containers pointing at mmap data in the resulting bitmap
so we have to track underlying file usage until the data is finished
with.
pull/18203/head
Ben Johnson 2020-05-22 10:46:05 -06:00
parent ca420c601d
commit 51f647d763
1 changed files with 34 additions and 14 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"os"
"regexp"
"sort"
@ -185,6 +186,7 @@ type SeriesIDSetIterator interface {
type seriesIDSetIterator struct {
ss *SeriesIDSet
itr SeriesIDSetIterable
closer io.Closer
}
func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator {
@ -194,6 +196,13 @@ func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator {
return &seriesIDSetIterator{ss: ss, itr: ss.Iterator()}
}
func NewSeriesIDSetIteratorWithCloser(ss *SeriesIDSet, closer io.Closer) SeriesIDSetIterator {
if ss == nil || ss.bitmap == nil {
return nil
}
return &seriesIDSetIterator{ss: ss, itr: ss.Iterator(), closer: closer}
}
func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) {
if !itr.itr.HasNext() {
return SeriesIDElem{}, nil
@ -201,10 +210,26 @@ func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) {
return SeriesIDElem{SeriesID: uint64(itr.itr.Next())}, nil
}
func (itr *seriesIDSetIterator) Close() error { return nil }
func (itr *seriesIDSetIterator) Close() error {
if itr.closer != nil {
return itr.closer.Close()
}
return nil
}
func (itr *seriesIDSetIterator) SeriesIDSet() *SeriesIDSet { return itr.ss }
type SeriesIDSetIterators []SeriesIDSetIterator
func (a SeriesIDSetIterators) Close() (err error) {
for i := range a {
if e := a[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
// NewSeriesIDSetIterators returns a slice of SeriesIDSetIterator if all itrs
// can be type casted. Otherwise returns nil.
func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator {
@ -448,8 +473,9 @@ func MergeSeriesIDIterators(itrs ...SeriesIDIterator) SeriesIDIterator {
ss := NewSeriesIDSet()
ss.Merge(sets...)
SeriesIDIterators(itrs).Close()
return NewSeriesIDSetIterator(ss)
// Attach underlying iterators as the closer
return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a))
}
return &seriesIDMergeIterator{
@ -522,9 +548,7 @@ func IntersectSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
return NewSeriesIDSetIterator(a[0].SeriesIDSet().And(a[1].SeriesIDSet()))
return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().And(a[1].SeriesIDSet()), SeriesIDSetIterators(a))
}
return &seriesIDIntersectIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
@ -611,11 +635,9 @@ func UnionSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
ss := NewSeriesIDSet()
ss.Merge(a[0].SeriesIDSet(), a[1].SeriesIDSet())
return NewSeriesIDSetIterator(ss)
return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a))
}
return &seriesIDUnionIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
@ -698,9 +720,7 @@ func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
return NewSeriesIDSetIterator(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()))
return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()), SeriesIDSetIterators(a))
}
return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}