diff --git a/tsdb/tsi1/file_set.go b/tsdb/tsi1/file_set.go index 6b11cc0a7a..48d9f785a9 100644 --- a/tsdb/tsi1/file_set.go +++ b/tsdb/tsi1/file_set.go @@ -331,15 +331,17 @@ func (fs *FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, r } // TagKeySeriesIDIterator returns a series iterator for all values across a single key. -func (fs *FileSet) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { +func (fs *FileSet) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { a := make([]tsdb.SeriesIDIterator, 0, len(fs.files)) for _, f := range fs.files { - itr := f.TagKeySeriesIDIterator(name, key) - if itr != nil { + itr, err := f.TagKeySeriesIDIterator(name, key) + if err != nil { + return nil, err + } else if itr != nil { a = append(a, itr) } } - return tsdb.MergeSeriesIDIterators(a...) + return tsdb.MergeSeriesIDIterators(a...), nil } // HasTagKey returns true if the tag key exists. @@ -423,7 +425,7 @@ type File interface { // Series iteration. MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator - TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator + TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) // Bitmap series existence. diff --git a/tsdb/tsi1/index_file.go b/tsdb/tsi1/index_file.go index 7881ec5d56..a253b07462 100644 --- a/tsdb/tsi1/index_file.go +++ b/tsdb/tsi1/index_file.go @@ -309,27 +309,36 @@ func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator { // TagKeySeriesIDIterator returns a series iterator for a tag key and a flag // indicating if a tombstone exists on the measurement or key. -func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { +func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { tblk := f.tblks[string(name)] if tblk == nil { - return nil + return nil, nil } // Find key element. ke := tblk.TagKeyElem(key) if ke == nil { - return nil + return nil, nil } // Merge all value series iterators together. vitr := ke.TagValueIterator() + var itrs []tsdb.SeriesIDIterator for ve := vitr.Next(); ve != nil; ve = vitr.Next() { - sitr := &rawSeriesIDIterator{data: ve.(*TagBlockValueElem).series.data} - itrs = append(itrs, sitr) + tblk, ok := ve.(*TagBlockValueElem) + if !ok { + return nil, fmt.Errorf("got type %T for iterator, expected %T", ve, TagBlockValueElem{}) + } + + ss, err := tblk.SeriesIDSet() + if err != nil { + return nil, err + } + itrs = append(itrs, tsdb.NewSeriesIDSetIterator(ss)) } - return tsdb.MergeSeriesIDIterators(itrs...) + return tsdb.MergeSeriesIDIterators(itrs...), nil } // TagValueSeriesIDSet returns a series id set for a tag value. diff --git a/tsdb/tsi1/index_file_test.go b/tsdb/tsi1/index_file_test.go index e687da8dcf..23c1bb4b12 100644 --- a/tsdb/tsi1/index_file_test.go +++ b/tsdb/tsi1/index_file_test.go @@ -3,7 +3,10 @@ package tsi1_test import ( "bytes" "context" + "fmt" + "reflect" "testing" + "time" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" @@ -32,6 +35,55 @@ func TestCreateIndexFile(t *testing.T) { } } +func TestIndexFile_TagKeySeriesIDIterator(t *testing.T) { + sfile := MustOpenSeriesFile() + defer sfile.Close() + + f, err := CreateIndexFile(sfile.SeriesFile, []Series{ + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"}), Type: models.Integer}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"}), Type: models.Integer}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"}), Type: models.Integer}, + }) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + itr, err := f.TagKeySeriesIDIterator([]byte("cpu"), []byte("region")) + if err != nil { + t.Fatal(err) + } + defer itr.Close() + + // NOTE(edd): the series keys end up being emitted in this order because the + // series were written to different partitons in the _series file_. As such, + // the key with region=west ends up with a lower series ID than the region=east + // series, even though it was written later. When the series id sets for each + // tag block in the index file are merged together and iterated, the roaring + // bitmap library sorts the series ids, resulting the the series keys being + // emitted in a different order to that which they were written. + exp := []string{"cpu,region=west", "cpu,region=east"} + var got []string + for { + e, err := itr.Next() + if err != nil { + t.Fatal(err) + } + + if e.SeriesID.ID == 0 { + break + } + fmt.Println(e.SeriesID.ID) + + name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID)) + got = append(got, string(models.MustNewPoint(string(name), tags, models.Fields{"a": "a"}, time.Time{}).Key())) + } + + if !reflect.DeepEqual(got, exp) { + t.Fatalf("got keys %v, expected %v", got, exp) + } +} + // Ensure index file generation can be successfully built. func TestGenerateIndexFile(t *testing.T) { sfile := MustOpenSeriesFile() diff --git a/tsdb/tsi1/log_file.go b/tsdb/tsi1/log_file.go index 4516497933..d509f631d7 100644 --- a/tsdb/tsi1/log_file.go +++ b/tsdb/tsi1/log_file.go @@ -353,18 +353,18 @@ func (f *LogFile) DeleteMeasurement(name []byte) error { } // TagKeySeriesIDIterator returns a series iterator for a tag key. -func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { +func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { - return nil + return nil, nil } tk, ok := mm.tagSet[string(key)] if !ok { - return nil + return nil, nil } // Combine iterators across all tag keys. @@ -376,7 +376,7 @@ func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator itrs = append(itrs, tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())) } - return tsdb.MergeSeriesIDIterators(itrs...) + return tsdb.MergeSeriesIDIterators(itrs...), nil } // TagKeyIterator returns a value iterator for a measurement. diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index debaeb6574..b47d389151 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -820,7 +820,13 @@ func (p *Partition) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDItera if err != nil { return nil, err } - return newFileSetSeriesIDIterator(fs, fs.TagKeySeriesIDIterator(name, key)), nil + + itr, err := fs.TagKeySeriesIDIterator(name, key) + if err != nil { + fs.Release() + return nil, err + } + return newFileSetSeriesIDIterator(fs, itr), nil } // TagValueSeriesIDIterator returns a series iterator for a single key value.