diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 16a8d70dbf..176345a1f4 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -509,10 +509,10 @@ func (f *LogFile) DeleteTagValue(name, key, value []byte) error { } // AddSeriesList adds a list of series to the log file in bulk. -func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) error { +func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice) if err != nil { - return err + return nil, err } var writeRequired bool @@ -521,6 +521,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag for i := range names { if seriesSet.ContainsNoLock(seriesIDs[i]) { // We don't need to allocate anything for this series. + seriesIDs[i] = 0 continue } writeRequired = true @@ -530,7 +531,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag // Exit if all series already exist. if !writeRequired { - return nil + return seriesIDs, nil } f.mu.Lock() @@ -539,21 +540,25 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag seriesSet.Lock() defer seriesSet.Unlock() - for i := range entries { + for i := range entries { // NB - this doesn't evaluate all series ids returned from series file. entry := &entries[i] if seriesSet.ContainsNoLock(entry.SeriesID) { // We don't need to allocate anything for this series. + seriesIDs[i] = 0 continue } if err := f.appendEntry(entry); err != nil { - return err + return nil, err } f.execEntry(entry) seriesSet.AddNoLock(entry.SeriesID) } // Flush buffer and sync to disk. - return f.FlushAndSync() + if err := f.FlushAndSync(); err != nil { + return nil, err + } + return seriesIDs, nil } // DeleteSeriesID adds a tombstone for a series id. diff --git a/tsdb/index/tsi1/log_file_test.go b/tsdb/index/tsi1/log_file_test.go index 85d05708d2..4410a8e017 100644 --- a/tsdb/index/tsi1/log_file_test.go +++ b/tsdb/index/tsi1/log_file_test.go @@ -7,12 +7,15 @@ import ( "math/rand" "os" "path/filepath" + "reflect" "regexp" "runtime/pprof" "sort" "testing" "time" + "github.com/influxdata/influxdb/pkg/slices" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/tsdb" @@ -29,18 +32,58 @@ func TestLogFile_AddSeriesList(t *testing.T) { seriesSet := tsdb.NewSeriesIDSet() // Add test data. - if err := f.AddSeriesList(seriesSet, [][]byte{ - []byte("mem"), - []byte("cpu"), - []byte("cpu"), - }, []models.Tags{ - {{Key: []byte("host"), Value: []byte("serverA")}}, - {{Key: []byte("region"), Value: []byte("us-east")}}, - {{Key: []byte("region"), Value: []byte("us-west")}}, - }); err != nil { + ids, err := f.AddSeriesList(seriesSet, + slices.StringsToBytes("cpu", "mem"), + []models.Tags{ + models.NewTags(map[string]string{"region": "us-east"}), + models.NewTags(map[string]string{"host": "serverA"}), + }, + ) + + if err != nil { t.Fatal(err) } + // Returned series ids should match those in the seriesSet. + other := tsdb.NewSeriesIDSet(ids...) + if !other.Equals(seriesSet) { + t.Fatalf("got series ids %s, expected %s", other, seriesSet) + } + + // Add the same series again with a new one. + ids, err = f.AddSeriesList(seriesSet, + slices.StringsToBytes("cpu", "mem"), + []models.Tags{ + models.NewTags(map[string]string{"region": "us-west"}), + models.NewTags(map[string]string{"host": "serverA"}), + }, + ) + + if err != nil { + t.Fatal(err) + } + + if got, exp := len(ids), 2; got != exp { + t.Fatalf("got %d series ids, expected %d", got, exp) + } else if got := ids[0]; got == 0 { + t.Error("series id was 0, expected it not to be") + } else if got := ids[1]; got != 0 { + t.Errorf("got series id %d, expected 0", got) + } + + // Add only the same series IDs. + ids, err = f.AddSeriesList(seriesSet, + slices.StringsToBytes("cpu", "mem"), + []models.Tags{ + models.NewTags(map[string]string{"region": "us-west"}), + models.NewTags(map[string]string{"host": "serverA"}), + }, + ) + + if got, exp := ids, make([]uint64, 2); !reflect.DeepEqual(got, exp) { + t.Fatalf("got ids %v, expected %v", got, exp) + } + // Verify data. itr := f.MeasurementIterator() if e := itr.Next(); e == nil || string(e.Name()) != "cpu" { @@ -82,7 +125,7 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) { tv := fmt.Sprintf("server-%d", rand.Intn(50)) // Encourage adding duplicate series. tvm[tv] = struct{}{} - if err := f.AddSeriesList(seriesSet, [][]byte{ + if _, err := f.AddSeriesList(seriesSet, [][]byte{ []byte("mem"), []byte("cpu"), }, []models.Tags{ @@ -133,7 +176,7 @@ func TestLogFile_DeleteMeasurement(t *testing.T) { seriesSet := tsdb.NewSeriesIDSet() // Add test data. - if err := f.AddSeriesList(seriesSet, [][]byte{ + if _, err := f.AddSeriesList(seriesSet, [][]byte{ []byte("mem"), []byte("cpu"), []byte("cpu"), @@ -172,7 +215,7 @@ func TestLogFile_Open(t *testing.T) { defer f.Close() // Add test data & close. - if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil { + if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil { t.Fatal(err) } else if err := f.LogFile.Close(); err != nil { t.Fatal(err) @@ -200,7 +243,7 @@ func TestLogFile_Open(t *testing.T) { } // Add more data & reopen. - if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil { + if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil { t.Fatal(err) } else if err := f.Reopen(); err != nil { t.Fatal(err) @@ -232,7 +275,7 @@ func TestLogFile_Open(t *testing.T) { defer f.Close() // Add test data & close. - if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil { + if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil { t.Fatal(err) } else if err := f.LogFile.Close(); err != nil { t.Fatal(err) @@ -313,7 +356,7 @@ func CreateLogFile(sfile *tsdb.SeriesFile, series []Series) (*LogFile, error) { f := MustOpenLogFile(sfile) seriesSet := tsdb.NewSeriesIDSet() for _, serie := range series { - if err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil { + if _, err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil { return nil, err } } @@ -338,7 +381,7 @@ func GenerateLogFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) (*L value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN))) tags = append(tags, models.NewTag(key, value)) } - if err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil { + if _, err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil { return nil, err } } @@ -385,7 +428,7 @@ func benchmarkLogFile_AddSeries(b *testing.B, measurementN, seriesKeyN, seriesVa for i := 0; i < b.N; i++ { for _, d := range data { - if err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil { + if _, err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil { b.Fatal(err) } } @@ -417,7 +460,7 @@ func BenchmarkLogFile_WriteTo(b *testing.B) { // Initialize log file with series data. for i := 0; i < seriesN; i++ { - if err := f.AddSeriesList( + if _, err := f.AddSeriesList( seriesSet, [][]byte{[]byte("cpu")}, []models.Tags{{ diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 31b08be537..43762385cc 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -655,7 +655,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode // Ensure fileset cannot change during insert. p.mu.RLock() - if err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil { + // Insert series into log file. + if _, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil { p.mu.RUnlock() return err }