fix: panic index out of range for invalid series keys (#24565)

* chore: add scaffolding for naive solution

* feat: test case scaffolding

* fix: implement check for series key before proceeding

* fix: add validation for ReadSeriesKeyMeasurement usage

* refactor: explicit use of series key len

* feat: add remaining check to index

* feat: add check to remaining files

As the Len function is used as part of the parseSeriesKey, this also needs to be accounted for on the nil return from this function as it is used in different contexts

* feat: expand test cases

* chore: go fmt

* chore: update test failure message

* chore: impl feedback on unnecessary sz checks

* feat: expand test cases

* fix: nil series key check

In both sections for index.go there is a pre-existing length check against the series key which should catch invalid values, perhaps this explains why it hasn't cropped up in the reported panics. For even more safety, we can also skip a nil key because we know that subsequent calls will cause a panic where this key is attempted to be used

* fix: remove nil tags check

A key with no tags is valid, so we should not check for BOTH nil key and tags as a key could be nil, which is invalid, yet still have tags and therefore cause the check to pass which we do not want

* feat: extend test cases from feedback

* fix: extend checks for CompareSeriesKeys

* feat: add nilKeyHandler for shared key checking logic

* fix: logical error in nilKeyHandler

Prior to this, the else was always defaulted to at the end of the conditional branch, which causes unexpected behaviour and a failure of a bunch of tests.

* fix: return tags keep nil data

In a recent change to this, we agreed on a simple name == nil check for the actual data. As a follow on to this, I just realised that we don't actually want to nil back the tags, even if they're not checked, because having no tags is a valid input so we can simply return whatever we were passed unchanged.

* fix: use len == 0 for extra safety

* feat: extra test for blank series key
pull/24675/head
Jack 2024-01-23 09:44:29 +00:00 committed by GitHub
parent c05b340b72
commit 6af0be9234
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 117 additions and 8 deletions

View File

@ -244,6 +244,9 @@ func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error {
break
}
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if len(name) == 0 {
continue
}
if !cmd.matchSeries(name, tags) {
continue
@ -368,6 +371,9 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet
}
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if len(name) == 0 {
continue
}
if !cmd.matchSeries(name, tags) {
continue

View File

@ -137,6 +137,9 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
}
name, tags := ParseSeriesKey(key)
if len(name) == 0 {
continue
}
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
@ -381,6 +384,9 @@ func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {
// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
key := string(models.MakeKey(name, tags))
// Write auxiliary fields.
@ -867,6 +873,9 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
}
name, tags := ParseSeriesKey(itr.keys[0])
if len(name) == 0 {
continue
}
itr.keys = itr.keys[1:]
// TODO(edd): It seems to me like this authorisation check should be
@ -2340,6 +2349,9 @@ func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
}
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
@ -2442,6 +2454,11 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}
name, tags := ParseSeriesKey(seriesKey)
// An invalid series key of 0 length should have been caught by the
// above check, but for extra safety we can pass over it here too.
if len(name) == 0 {
continue
}
keys = append(keys, models.MakeKey(name, tags))
}
@ -2918,12 +2935,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,
if auth != nil {
name, tags := ParseSeriesKey(buf)
if len(name) == 0 {
continue
}
if !auth.AuthorizeSeriesRead(database, name, tags) {
continue
}
}
_, buf = ReadSeriesKeyLen(buf)
if len(buf) == 0 {
continue
}
_, buf = ReadSeriesKeyMeasurement(buf)
tagN, buf := ReadSeriesKeyTagN(buf)
for i := 0; i < tagN; i++ {

View File

@ -713,6 +713,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
if len(remainder) == 0 {
return
}
// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)

View File

@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"
@ -112,6 +113,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
}
cur.row.Name, cur.row.Tags = ParseSeriesKey(cur.keys[cur.ofs])
if len(cur.row.Name) == 0 {
return nil, fmt.Errorf("series key was not valid: %+v", cur.keys[cur.ofs])
}
cur.ofs++
//if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {

View File

@ -414,6 +414,9 @@ func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags)
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
if len(data) == 0 {
return nil, dst
}
name, data = ReadSeriesKeyMeasurement(data)
tagN, data := ReadSeriesKeyTagN(data)
@ -436,17 +439,30 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
func CompareSeriesKeys(a, b []byte) int {
// Handle 'nil' keys.
if len(a) == 0 && len(b) == 0 {
return 0
} else if len(a) == 0 {
return -1
} else if len(b) == 0 {
return 1
// Values below and equal to 1 are considered invalid for key comparisons.
nilKeyHandler := func(a, b int) int {
if a == 0 && b == 0 {
return 0
} else if a == 0 {
return -1
} else if b == 0 {
return 1
}
return a + b
}
keyValidity := nilKeyHandler(len(a), len(b))
if keyValidity <= 1 {
return keyValidity
}
// Read total size.
_, a = ReadSeriesKeyLen(a)
_, b = ReadSeriesKeyLen(b)
szA, a := ReadSeriesKeyLen(a)
szB, b := ReadSeriesKeyLen(b)
keySizeValidity := nilKeyHandler(szA, szB)
if keySizeValidity <= 1 {
return keySizeValidity
}
// Read names.
name0, a := ReadSeriesKeyMeasurement(a)

View File

@ -50,6 +50,57 @@ func TestParseSeriesKeyInto(t *testing.T) {
}
}
func TestParseSeriesKey(t *testing.T) {
tests := []struct {
name string
seriesKey []byte
expectedKey []byte
}{
{
name: "invalid zero length series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, nil),
expectedKey: nil,
},
{
name: "invalid blank series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte(""), nil),
expectedKey: nil,
},
{
name: "invalid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: nil,
},
{
name: "valid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with empty tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with nil tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(nil)),
expectedKey: []byte("foo"),
},
{
name: "valid series key with no tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), nil),
expectedKey: []byte("foo"),
},
}
for _, tt := range tests {
keyName, _ := tsdb.ParseSeriesKey(tt.seriesKey)
if res := bytes.Compare(keyName, tt.expectedKey); res != 0 {
t.Fatalf("invalid series key parsed for an %s: got %q, expected %q", tt.name, keyName, tt.expectedKey)
}
}
}
// Ensure that broken series files are closed
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0})

View File

@ -264,7 +264,13 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
if tracker.AddedMeasurementSeries != nil {
_, remainder := ReadSeriesKeyLen(key)
if len(remainder) == 0 {
continue
}
measurement, _ := ReadSeriesKeyMeasurement(remainder)
if len(measurement) == 0 {
continue
}
tracker.AddedMeasurementSeries(measurement, 1)
}
}