fix: copy names from mmapped memory before closing iterator (#22040)

This fix ensures that memory-mapped files are not released
before pointers into them are copied into heap memory.
MeasurementNamesByExpr() and MeasurementNamesByPredicate() can
cause panics by copying memory from mmapped files that have been
released. The functions they call use iterators to files which
are closed (releasing the mmapped files) before the memory is
safely copied to the heap.

closes https://github.com/influxdata/influxdb/issues/22000
pull/22103/head
davidby-influx 2021-08-04 13:16:00 -07:00 committed by GitHub
parent d0fe2ba2ff
commit a989f8f8b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 28 deletions

View File

@ -27,6 +27,7 @@
- [#21978](https://github.com/influxdata/influxdb/pull/21978): fix: restore portable backup bug
- [#21992](https://github.com/influxdata/influxdb/pull/21992): fix: systemd-start script should be executable by group and others
- [#22019](https://github.com/influxdata/influxdb/pull/22019): fix: error instead of panic when enterprise tries to restore with OSS
- [#22040](https://github.com/influxdata/influxdb/pull/22040): fix: copy names from mmapped memory before closing iterator
v1.9.2 [unreleased]
- [#21631](https://github.com/influxdata/influxdb/pull/21631): fix: group by returns multiple results per group in some circumstances

View File

@ -949,6 +949,11 @@ func (a MeasurementIterators) Close() (err error) {
return err
}
type MeasurementSliceIterator interface {
MeasurementIterator
UnderlyingSlice() [][]byte
}
type measurementSliceIterator struct {
names [][]byte
}
@ -968,6 +973,37 @@ func (itr *measurementSliceIterator) Next() (name []byte, err error) {
return name, nil
}
func (itr *measurementSliceIterator) UnderlyingSlice() [][]byte {
return itr.names
}
// fileMeasurementSliceIterator is designed to allow a tag value slice
// iterator to use memory from a memory-mapped file, pinning it
// with the underlying file iterators
type fileMeasurementSliceIterator struct {
measurementSliceIterator
fileIterators MeasurementIterators
}
func (itr *fileMeasurementSliceIterator) Close() error {
e1 := itr.fileIterators.Close()
e2 := itr.measurementSliceIterator.Close()
if e1 != nil {
return e1
} else {
return e2
}
}
func newFileMeasurementSliceIterator(names [][]byte, itrs MeasurementIterators) *fileMeasurementSliceIterator {
return &fileMeasurementSliceIterator{
measurementSliceIterator: measurementSliceIterator{
names: names,
},
fileIterators: itrs,
}
}
// MergeMeasurementIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precedence and a deletion by those
// early iterators will invalidate elements by later iterators.
@ -1320,17 +1356,24 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet {
// MeasurementNamesByExpr returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (_ [][]byte, err error) {
release := is.SeriesFile.Retain()
defer release()
// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByExpr(auth, expr)
itr, err := is.measurementNamesByExpr(auth, expr)
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
return slices.CopyChunkedByteSlices(names, 1000), nil
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil
}
itr, err := is.measurementIterator()
@ -1339,10 +1382,14 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx
} else if itr == nil {
return nil, nil
}
defer itr.Close()
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
// Iterate over all measurements if no condition exists.
var names [][]byte
// Iterate over all measurements if no condition exists.
for {
e, err := itr.Next()
if err != nil {
@ -1360,7 +1407,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx
return slices.CopyChunkedByteSlices(names, 1000), nil
}
func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementSliceIterator, error) {
if expr == nil {
return nil, nil
}
@ -1400,20 +1447,22 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx
return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex)
case influxql.OR, influxql.AND:
lhs, err := is.measurementNamesByExpr(auth, e.LHS)
if err != nil {
return nil, err
}
rhs, err := is.measurementNamesByExpr(auth, e.RHS)
if err != nil {
lhs.Close()
return nil, err
}
mis := MeasurementIterators{lhs, rhs}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
}
return bytesutil.Intersect(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
default:
return nil, fmt.Errorf("invalid tag comparison operator")
@ -1422,24 +1471,24 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx
case *influxql.ParenExpr:
return is.measurementNamesByExpr(auth, e.Expr)
default:
return nil, fmt.Errorf("Invalid measurement expression %#v", expr)
return nil, fmt.Errorf("invalid measurement expression %#v", expr)
}
}
// measurementNamesByNameFilter returns matching measurement names in sorted order.
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
itr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
var names [][]byte
for {
e, err := itr.Next()
if err != nil {
itr.Close()
return nil, err
} else if e == nil {
break
@ -1462,24 +1511,31 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op in
}
}
bytesutil.Sort(names)
return names, nil
return newFileMeasurementSliceIterator(names, MeasurementIterators{itr}), nil
}
// MeasurementNamesByPredicate returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
// This behaves differently from MeasurementNamesByExpr because it will
// return measurements using flux predicates.
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (_ [][]byte, err error) {
release := is.SeriesFile.Retain()
defer release()
// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByPredicate(auth, expr)
itr, err := is.measurementNamesByPredicate(auth, expr)
if err != nil {
return nil, err
}
return slices.CopyChunkedByteSlices(names, 1000), nil
if itr != nil {
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
}
return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil
}
itr, err := is.measurementIterator()
@ -1488,10 +1544,14 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i
} else if itr == nil {
return nil, nil
}
defer itr.Close()
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
// Iterate over all measurements if no condition exists.
var names [][]byte
// Iterate over all measurements if no condition exists.
for {
e, err := itr.Next()
if err != nil {
@ -1509,7 +1569,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i
return slices.CopyChunkedByteSlices(names, 1000), nil
}
func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementSliceIterator, error) {
if expr == nil {
return nil, nil
}
@ -1553,16 +1613,17 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i
if err != nil {
return nil, err
}
rhs, err := is.measurementNamesByPredicate(auth, e.RHS)
if err != nil {
lhs.Close()
return nil, err
}
mis := MeasurementIterators{lhs, rhs}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
}
return bytesutil.Intersect(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
default:
return nil, fmt.Errorf("invalid tag comparison operator")
@ -1575,8 +1636,9 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i
}
}
func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
var names [][]byte
failed := true
mitr, err := is.measurementIterator()
if err != nil {
@ -1584,7 +1646,11 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op inf
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
defer func() {
if failed {
mitr.Close()
}
}()
// valEqual determines if the provided []byte is equal to the tag value
// to be filtered on.
@ -1699,11 +1765,13 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op inf
}
bytesutil.Sort(names)
return names, nil
failed = false
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil
}
func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
var names [][]byte
failed := true
mitr, err := is.measurementIterator()
if err != nil {
@ -1711,7 +1779,11 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
defer func() {
if failed {
mitr.Close()
}
}()
var checkMeasurement func(auth query.FineAuthorizer, me []byte) (bool, error)
switch op {
@ -1762,7 +1834,8 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op
}
bytesutil.Sort(names)
return names, nil
failed = false
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil
}
// measurementAuthorizedSeries determines if the measurement contains a series