fix: panic index out of range for invalid series keys [Port to 2.7] (#24599)

pull/24619/head
Jack 2024-01-23 17:08:07 +00:00 committed by GitHub
parent 09a9607fd9
commit bfb09aadac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 105 additions and 8 deletions

View File

@ -126,6 +126,9 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
}
name, tags := ParseSeriesKey(key)
if len(name) == 0 {
continue
}
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
@ -390,6 +393,9 @@ func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {
// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
key := string(models.MakeKey(name, tags))
// Write auxiliary fields.
@ -886,6 +892,9 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
}
name, tags := ParseSeriesKey(itr.keys[0])
if len(name) == 0 {
continue
}
itr.keys = itr.keys[1:]
// TODO(edd): It seems to me like this authorisation check should be
@ -2327,6 +2336,9 @@ func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
}
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
@ -2428,6 +2440,11 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}
name, tags := ParseSeriesKey(seriesKey)
// An invalid series key of 0 length should have been caught by the
// above check, but for extra safety we can pass over it here too.
if len(name) == 0 {
continue
}
keys = append(keys, models.MakeKey(name, tags))
}
@ -2904,12 +2921,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
if auth != nil {
name, tags := ParseSeriesKey(buf)
if len(name) == 0 {
continue
}
if !auth.AuthorizeSeriesRead(database, name, tags) {
continue
}
}
_, buf = ReadSeriesKeyLen(buf)
if len(buf) == 0 {
continue
}
_, buf = ReadSeriesKeyMeasurement(buf)
tagN, buf := ReadSeriesKeyTagN(buf)
for i := 0; i < tagN; i++ {

View File

@ -693,6 +693,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
if len(remainder) == 0 {
return
}
// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)

View File

@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"
@ -112,6 +113,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
}
cur.row.Name, cur.row.Tags = ParseSeriesKey(cur.keys[cur.ofs])
if len(cur.row.Name) == 0 {
return nil, fmt.Errorf("series key was not valid: %+v", cur.keys[cur.ofs])
}
cur.ofs++
//if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {

View File

@ -414,6 +414,9 @@ func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags)
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
if len(data) == 0 {
return nil, dst
}
name, data = ReadSeriesKeyMeasurement(data)
tagN, data := ReadSeriesKeyTagN(data)
@ -436,17 +439,30 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
func CompareSeriesKeys(a, b []byte) int {
// Handle 'nil' keys.
if len(a) == 0 && len(b) == 0 {
return 0
} else if len(a) == 0 {
return -1
} else if len(b) == 0 {
return 1
// Values below and equal to 1 are considered invalid for key comparisons.
nilKeyHandler := func(a, b int) int {
if a == 0 && b == 0 {
return 0
} else if a == 0 {
return -1
} else if b == 0 {
return 1
}
return a + b
}
keyValidity := nilKeyHandler(len(a), len(b))
if keyValidity <= 1 {
return keyValidity
}
// Read total size.
_, a = ReadSeriesKeyLen(a)
_, b = ReadSeriesKeyLen(b)
szA, a := ReadSeriesKeyLen(a)
szB, b := ReadSeriesKeyLen(b)
keySizeValidity := nilKeyHandler(szA, szB)
if keySizeValidity <= 1 {
return keySizeValidity
}
// Read names.
name0, a := ReadSeriesKeyMeasurement(a)

View File

@ -51,6 +51,57 @@ func TestParseSeriesKeyInto(t *testing.T) {
}
}
func TestParseSeriesKey(t *testing.T) {
tests := []struct {
name string
seriesKey []byte
expectedKey []byte
}{
{
name: "invalid zero length series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, nil),
expectedKey: nil,
},
{
name: "invalid blank series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte(""), nil),
expectedKey: nil,
},
{
name: "invalid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: nil,
},
{
name: "valid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with empty tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with nil tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(nil)),
expectedKey: []byte("foo"),
},
{
name: "valid series key with no tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), nil),
expectedKey: []byte("foo"),
},
}
for _, tt := range tests {
keyName, _ := tsdb.ParseSeriesKey(tt.seriesKey)
if res := bytes.Compare(keyName, tt.expectedKey); res != 0 {
t.Fatalf("invalid series key parsed for an %s: got %q, expected %q", tt.name, keyName, tt.expectedKey)
}
}
}
// Ensure that broken series files are closed
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
f := NewBrokenSeriesFile(t, []byte{0, 0, 0, 0, 0})