diff --git a/CHANGELOG.md b/CHANGELOG.md index 454e7915c4..942bea9323 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - [#21978](https://github.com/influxdata/influxdb/pull/21978): fix: restore portable backup bug - [#21992](https://github.com/influxdata/influxdb/pull/21992): fix: systemd-start script should be executable by group and others - [#22019](https://github.com/influxdata/influxdb/pull/22019): fix: error instead of panic when enterprise tries to restore with OSS +- [#22040](https://github.com/influxdata/influxdb/pull/22040): fix: copy names from mmapped memory before closing iterator v1.9.2 [unreleased] - [#21631](https://github.com/influxdata/influxdb/pull/21631): fix: group by returns multiple results per group in some circumstances diff --git a/tsdb/index.go b/tsdb/index.go index 487db2fe2e..210a024c02 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -949,6 +949,11 @@ func (a MeasurementIterators) Close() (err error) { return err } +type MeasurementSliceIterator interface { + MeasurementIterator + UnderlyingSlice() [][]byte +} + type measurementSliceIterator struct { names [][]byte } @@ -968,6 +973,37 @@ func (itr *measurementSliceIterator) Next() (name []byte, err error) { return name, nil } +func (itr *measurementSliceIterator) UnderlyingSlice() [][]byte { + return itr.names +} + +// fileMeasurementSliceIterator is designed to allow a tag value slice +// iterator to use memory from a memory-mapped file, pinning it +// with the underlying file iterators +type fileMeasurementSliceIterator struct { + measurementSliceIterator + fileIterators MeasurementIterators +} + +func (itr *fileMeasurementSliceIterator) Close() error { + e1 := itr.fileIterators.Close() + e2 := itr.measurementSliceIterator.Close() + if e1 != nil { + return e1 + } else { + return e2 + } +} + +func newFileMeasurementSliceIterator(names [][]byte, itrs MeasurementIterators) *fileMeasurementSliceIterator { + return &fileMeasurementSliceIterator{ + measurementSliceIterator: measurementSliceIterator{ + names: names, + }, + fileIterators: itrs, + } +} + // MergeMeasurementIterators returns an iterator that merges a set of iterators. // Iterators that are first in the list take precedence and a deletion by those // early iterators will invalidate elements by later iterators. @@ -1320,17 +1356,24 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet { // MeasurementNamesByExpr returns a slice of measurement names matching the // provided condition. If no condition is provided then all names are returned. -func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (_ [][]byte, err error) { release := is.SeriesFile.Retain() defer release() // Return filtered list if expression exists. if expr != nil { - names, err := is.measurementNamesByExpr(auth, expr) + itr, err := is.measurementNamesByExpr(auth, expr) if err != nil { return nil, err + } else if itr == nil { + return nil, nil } - return slices.CopyChunkedByteSlices(names, 1000), nil + defer func() { + if e := itr.Close(); err == nil { + err = e + } + }() + return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil } itr, err := is.measurementIterator() @@ -1339,10 +1382,14 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx } else if itr == nil { return nil, nil } - defer itr.Close() + defer func() { + if e := itr.Close(); err == nil { + err = e + } + }() - // Iterate over all measurements if no condition exists. var names [][]byte + // Iterate over all measurements if no condition exists. for { e, err := itr.Next() if err != nil { @@ -1360,7 +1407,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx return slices.CopyChunkedByteSlices(names, 1000), nil } -func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementSliceIterator, error) { if expr == nil { return nil, nil } @@ -1400,20 +1447,22 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex) case influxql.OR, influxql.AND: + lhs, err := is.measurementNamesByExpr(auth, e.LHS) if err != nil { return nil, err } - rhs, err := is.measurementNamesByExpr(auth, e.RHS) if err != nil { + lhs.Close() return nil, err } + mis := MeasurementIterators{lhs, rhs} if e.Op == influxql.OR { - return bytesutil.Union(lhs, rhs), nil + return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil } - return bytesutil.Intersect(lhs, rhs), nil + return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil default: return nil, fmt.Errorf("invalid tag comparison operator") @@ -1422,24 +1471,24 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx case *influxql.ParenExpr: return is.measurementNamesByExpr(auth, e.Expr) default: - return nil, fmt.Errorf("Invalid measurement expression %#v", expr) + return nil, fmt.Errorf("invalid measurement expression %#v", expr) } } // measurementNamesByNameFilter returns matching measurement names in sorted order. -func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { +func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) { itr, err := is.measurementIterator() if err != nil { return nil, err } else if itr == nil { return nil, nil } - defer itr.Close() var names [][]byte for { e, err := itr.Next() if err != nil { + itr.Close() return nil, err } else if e == nil { break @@ -1462,24 +1511,31 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op in } } bytesutil.Sort(names) - return names, nil + return newFileMeasurementSliceIterator(names, MeasurementIterators{itr}), nil } // MeasurementNamesByPredicate returns a slice of measurement names matching the // provided condition. If no condition is provided then all names are returned. // This behaves differently from MeasurementNamesByExpr because it will // return measurements using flux predicates. -func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (_ [][]byte, err error) { release := is.SeriesFile.Retain() defer release() // Return filtered list if expression exists. if expr != nil { - names, err := is.measurementNamesByPredicate(auth, expr) + itr, err := is.measurementNamesByPredicate(auth, expr) if err != nil { return nil, err } - return slices.CopyChunkedByteSlices(names, 1000), nil + if itr != nil { + defer func() { + if e := itr.Close(); err == nil { + err = e + } + }() + } + return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil } itr, err := is.measurementIterator() @@ -1488,10 +1544,14 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i } else if itr == nil { return nil, nil } - defer itr.Close() + defer func() { + if e := itr.Close(); err == nil { + err = e + } + }() - // Iterate over all measurements if no condition exists. var names [][]byte + // Iterate over all measurements if no condition exists. for { e, err := itr.Next() if err != nil { @@ -1509,7 +1569,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i return slices.CopyChunkedByteSlices(names, 1000), nil } -func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementSliceIterator, error) { if expr == nil { return nil, nil } @@ -1553,16 +1613,17 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i if err != nil { return nil, err } - rhs, err := is.measurementNamesByPredicate(auth, e.RHS) if err != nil { + lhs.Close() return nil, err } + mis := MeasurementIterators{lhs, rhs} if e.Op == influxql.OR { - return bytesutil.Union(lhs, rhs), nil + return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil } - return bytesutil.Intersect(lhs, rhs), nil + return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil default: return nil, fmt.Errorf("invalid tag comparison operator") @@ -1575,8 +1636,9 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i } } -func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { +func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) { var names [][]byte + failed := true mitr, err := is.measurementIterator() if err != nil { @@ -1584,7 +1646,11 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op inf } else if mitr == nil { return nil, nil } - defer mitr.Close() + defer func() { + if failed { + mitr.Close() + } + }() // valEqual determines if the provided []byte is equal to the tag value // to be filtered on. @@ -1699,11 +1765,13 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op inf } bytesutil.Sort(names) - return names, nil + failed = false + return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil } -func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { +func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) { var names [][]byte + failed := true mitr, err := is.measurementIterator() if err != nil { @@ -1711,7 +1779,11 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op } else if mitr == nil { return nil, nil } - defer mitr.Close() + defer func() { + if failed { + mitr.Close() + } + }() var checkMeasurement func(auth query.FineAuthorizer, me []byte) (bool, error) switch op { @@ -1762,7 +1834,8 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op } bytesutil.Sort(names) - return names, nil + failed = false + return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil } // measurementAuthorizedSeries determines if the measurement contains a series