fix(tsi1): index defect with negated equality filters
Fixes #15859 This commit fixes a defect in the TSI index where a filter using the negated equality operator would result in no matching series being returned for series stored within the `IndexFile` portions of the index. The root cause of this was due to missing legacy-handling code in the index for this particular iterator.pull/15860/head
parent
888ce50ef8
commit
0dd2d38eac
|
|
@ -331,15 +331,17 @@ func (fs *FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, r
|
|||
}
|
||||
|
||||
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
|
||||
func (fs *FileSet) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
|
||||
func (fs *FileSet) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
|
||||
for _, f := range fs.files {
|
||||
itr := f.TagKeySeriesIDIterator(name, key)
|
||||
if itr != nil {
|
||||
itr, err := f.TagKeySeriesIDIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(a...)
|
||||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
}
|
||||
|
||||
// HasTagKey returns true if the tag key exists.
|
||||
|
|
@ -423,7 +425,7 @@ type File interface {
|
|||
|
||||
// Series iteration.
|
||||
MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
|
||||
TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
|
||||
TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error)
|
||||
TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error)
|
||||
|
||||
// Bitmap series existence.
|
||||
|
|
|
|||
|
|
@ -309,27 +309,36 @@ func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator {
|
|||
|
||||
// TagKeySeriesIDIterator returns a series iterator for a tag key and a flag
|
||||
// indicating if a tombstone exists on the measurement or key.
|
||||
func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
|
||||
func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
|
||||
tblk := f.tblks[string(name)]
|
||||
if tblk == nil {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Find key element.
|
||||
ke := tblk.TagKeyElem(key)
|
||||
if ke == nil {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Merge all value series iterators together.
|
||||
vitr := ke.TagValueIterator()
|
||||
|
||||
var itrs []tsdb.SeriesIDIterator
|
||||
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
|
||||
sitr := &rawSeriesIDIterator{data: ve.(*TagBlockValueElem).series.data}
|
||||
itrs = append(itrs, sitr)
|
||||
tblk, ok := ve.(*TagBlockValueElem)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("got type %T for iterator, expected %T", ve, TagBlockValueElem{})
|
||||
}
|
||||
|
||||
ss, err := tblk.SeriesIDSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
itrs = append(itrs, tsdb.NewSeriesIDSetIterator(ss))
|
||||
}
|
||||
|
||||
return tsdb.MergeSeriesIDIterators(itrs...)
|
||||
return tsdb.MergeSeriesIDIterators(itrs...), nil
|
||||
}
|
||||
|
||||
// TagValueSeriesIDSet returns a series id set for a tag value.
|
||||
|
|
|
|||
|
|
@ -3,7 +3,10 @@ package tsi1_test
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
|
@ -32,6 +35,55 @@ func TestCreateIndexFile(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndexFile_TagKeySeriesIDIterator(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
f, err := CreateIndexFile(sfile.SeriesFile, []Series{
|
||||
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"}), Type: models.Integer},
|
||||
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"}), Type: models.Integer},
|
||||
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"}), Type: models.Integer},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
itr, err := f.TagKeySeriesIDIterator([]byte("cpu"), []byte("region"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
// NOTE(edd): the series keys end up being emitted in this order because the
|
||||
// series were written to different partitons in the _series file_. As such,
|
||||
// the key with region=west ends up with a lower series ID than the region=east
|
||||
// series, even though it was written later. When the series id sets for each
|
||||
// tag block in the index file are merged together and iterated, the roaring
|
||||
// bitmap library sorts the series ids, resulting the the series keys being
|
||||
// emitted in a different order to that which they were written.
|
||||
exp := []string{"cpu,region=west", "cpu,region=east"}
|
||||
var got []string
|
||||
for {
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if e.SeriesID.ID == 0 {
|
||||
break
|
||||
}
|
||||
fmt.Println(e.SeriesID.ID)
|
||||
|
||||
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
|
||||
got = append(got, string(models.MustNewPoint(string(name), tags, models.Fields{"a": "a"}, time.Time{}).Key()))
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got keys %v, expected %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure index file generation can be successfully built.
|
||||
func TestGenerateIndexFile(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
|
|
|
|||
|
|
@ -353,18 +353,18 @@ func (f *LogFile) DeleteMeasurement(name []byte) error {
|
|||
}
|
||||
|
||||
// TagKeySeriesIDIterator returns a series iterator for a tag key.
|
||||
func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
|
||||
func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
mm, ok := f.mms[string(name)]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tk, ok := mm.tagSet[string(key)]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Combine iterators across all tag keys.
|
||||
|
|
@ -376,7 +376,7 @@ func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
|
|||
itrs = append(itrs, tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()))
|
||||
}
|
||||
|
||||
return tsdb.MergeSeriesIDIterators(itrs...)
|
||||
return tsdb.MergeSeriesIDIterators(itrs...), nil
|
||||
}
|
||||
|
||||
// TagKeyIterator returns a value iterator for a measurement.
|
||||
|
|
|
|||
|
|
@ -820,7 +820,13 @@ func (p *Partition) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDItera
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newFileSetSeriesIDIterator(fs, fs.TagKeySeriesIDIterator(name, key)), nil
|
||||
|
||||
itr, err := fs.TagKeySeriesIDIterator(name, key)
|
||||
if err != nil {
|
||||
fs.Release()
|
||||
return nil, err
|
||||
}
|
||||
return newFileSetSeriesIDIterator(fs, itr), nil
|
||||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single key value.
|
||||
|
|
|
|||
Loading…
Reference in New Issue