From 065d47e4f249746469be1f10af33e2cd812a7a8f Mon Sep 17 00:00:00 2001
From: Edd Robinson <me@edd.io>
Date: Tue, 7 Aug 2018 13:38:13 +0100
Subject: [PATCH] Return created series ids from LogFile insertion

---
 tsdb/index/tsi1/log_file.go      | 17 ++++---
 tsdb/index/tsi1/log_file_test.go | 79 ++++++++++++++++++++++++--------
 tsdb/index/tsi1/partition.go     |  3 +-
 3 files changed, 74 insertions(+), 25 deletions(-)

diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go
index 16a8d70dbf..176345a1f4 100644
--- a/tsdb/index/tsi1/log_file.go
+++ b/tsdb/index/tsi1/log_file.go
@@ -509,10 +509,10 @@ func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
 }
 
 // AddSeriesList adds a list of series to the log file in bulk.
-func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) error {
+func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
 	seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	var writeRequired bool
@@ -521,6 +521,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
 	for i := range names {
 		if seriesSet.ContainsNoLock(seriesIDs[i]) {
 			// We don't need to allocate anything for this series.
+			seriesIDs[i] = 0
 			continue
 		}
 		writeRequired = true
@@ -530,7 +531,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
 
 	// Exit if all series already exist.
 	if !writeRequired {
-		return nil
+		return seriesIDs, nil
 	}
 
 	f.mu.Lock()
@@ -539,21 +540,25 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
 	seriesSet.Lock()
 	defer seriesSet.Unlock()
 
-	for i := range entries {
+	for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
 		entry := &entries[i]
 		if seriesSet.ContainsNoLock(entry.SeriesID) {
 			// We don't need to allocate anything for this series.
+			seriesIDs[i] = 0
 			continue
 		}
 		if err := f.appendEntry(entry); err != nil {
-			return err
+			return nil, err
 		}
 		f.execEntry(entry)
 		seriesSet.AddNoLock(entry.SeriesID)
 	}
 
 	// Flush buffer and sync to disk.
-	return f.FlushAndSync()
+	if err := f.FlushAndSync(); err != nil {
+		return nil, err
+	}
+	return seriesIDs, nil
 }
 
 // DeleteSeriesID adds a tombstone for a series id.
diff --git a/tsdb/index/tsi1/log_file_test.go b/tsdb/index/tsi1/log_file_test.go
index 85d05708d2..4410a8e017 100644
--- a/tsdb/index/tsi1/log_file_test.go
+++ b/tsdb/index/tsi1/log_file_test.go
@@ -7,12 +7,15 @@ import (
 	"math/rand"
 	"os"
 	"path/filepath"
+	"reflect"
 	"regexp"
 	"runtime/pprof"
 	"sort"
 	"testing"
 	"time"
 
+	"github.com/influxdata/influxdb/pkg/slices"
+
 	"github.com/influxdata/influxdb/models"
 	"github.com/influxdata/influxdb/pkg/bloom"
 	"github.com/influxdata/influxdb/tsdb"
@@ -29,18 +32,58 @@ func TestLogFile_AddSeriesList(t *testing.T) {
 	seriesSet := tsdb.NewSeriesIDSet()
 
 	// Add test data.
-	if err := f.AddSeriesList(seriesSet, [][]byte{
-		[]byte("mem"),
-		[]byte("cpu"),
-		[]byte("cpu"),
-	}, []models.Tags{
-		{{Key: []byte("host"), Value: []byte("serverA")}},
-		{{Key: []byte("region"), Value: []byte("us-east")}},
-		{{Key: []byte("region"), Value: []byte("us-west")}},
-	}); err != nil {
+	ids, err := f.AddSeriesList(seriesSet,
+		slices.StringsToBytes("cpu", "mem"),
+		[]models.Tags{
+			models.NewTags(map[string]string{"region": "us-east"}),
+			models.NewTags(map[string]string{"host": "serverA"}),
+		},
+	)
+
+	if err != nil {
 		t.Fatal(err)
 	}
 
+	// Returned series ids should match those in the seriesSet.
+	other := tsdb.NewSeriesIDSet(ids...)
+	if !other.Equals(seriesSet) {
+		t.Fatalf("got series ids %s, expected %s", other, seriesSet)
+	}
+
+	// Add the same series again with a new one.
+	ids, err = f.AddSeriesList(seriesSet,
+		slices.StringsToBytes("cpu", "mem"),
+		[]models.Tags{
+			models.NewTags(map[string]string{"region": "us-west"}),
+			models.NewTags(map[string]string{"host": "serverA"}),
+		},
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if got, exp := len(ids), 2; got != exp {
+		t.Fatalf("got %d series ids, expected %d", got, exp)
+	} else if got := ids[0]; got == 0 {
+		t.Error("series id was 0, expected it not to be")
+	} else if got := ids[1]; got != 0 {
+		t.Errorf("got series id %d, expected 0", got)
+	}
+
+	// Add only the same series IDs.
+	ids, err = f.AddSeriesList(seriesSet,
+		slices.StringsToBytes("cpu", "mem"),
+		[]models.Tags{
+			models.NewTags(map[string]string{"region": "us-west"}),
+			models.NewTags(map[string]string{"host": "serverA"}),
+		},
+	)
+
+	if got, exp := ids, make([]uint64, 2); !reflect.DeepEqual(got, exp) {
+		t.Fatalf("got ids %v, expected %v", got, exp)
+	}
+
 	// Verify data.
 	itr := f.MeasurementIterator()
 	if e := itr.Next(); e == nil || string(e.Name()) != "cpu" {
@@ -82,7 +125,7 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) {
 		tv := fmt.Sprintf("server-%d", rand.Intn(50)) // Encourage adding duplicate series.
 		tvm[tv] = struct{}{}
 
-		if err := f.AddSeriesList(seriesSet, [][]byte{
+		if _, err := f.AddSeriesList(seriesSet, [][]byte{
 			[]byte("mem"),
 			[]byte("cpu"),
 		}, []models.Tags{
@@ -133,7 +176,7 @@ func TestLogFile_DeleteMeasurement(t *testing.T) {
 	seriesSet := tsdb.NewSeriesIDSet()
 
 	// Add test data.
-	if err := f.AddSeriesList(seriesSet, [][]byte{
+	if _, err := f.AddSeriesList(seriesSet, [][]byte{
 		[]byte("mem"),
 		[]byte("cpu"),
 		[]byte("cpu"),
@@ -172,7 +215,7 @@ func TestLogFile_Open(t *testing.T) {
 		defer f.Close()
 
 		// Add test data & close.
-		if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
+		if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
 			t.Fatal(err)
 		} else if err := f.LogFile.Close(); err != nil {
 			t.Fatal(err)
@@ -200,7 +243,7 @@ func TestLogFile_Open(t *testing.T) {
 		}
 
 		// Add more data & reopen.
-		if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil {
+		if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil {
 			t.Fatal(err)
 		} else if err := f.Reopen(); err != nil {
 			t.Fatal(err)
@@ -232,7 +275,7 @@ func TestLogFile_Open(t *testing.T) {
 		defer f.Close()
 
 		// Add test data & close.
-		if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
+		if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
 			t.Fatal(err)
 		} else if err := f.LogFile.Close(); err != nil {
 			t.Fatal(err)
@@ -313,7 +356,7 @@ func CreateLogFile(sfile *tsdb.SeriesFile, series []Series) (*LogFile, error) {
 	f := MustOpenLogFile(sfile)
 	seriesSet := tsdb.NewSeriesIDSet()
 	for _, serie := range series {
-		if err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil {
+		if _, err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil {
 			return nil, err
 		}
 	}
@@ -338,7 +381,7 @@ func GenerateLogFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) (*L
 				value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN)))
 				tags = append(tags, models.NewTag(key, value))
 			}
-			if err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil {
+			if _, err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil {
 				return nil, err
 			}
 		}
@@ -385,7 +428,7 @@ func benchmarkLogFile_AddSeries(b *testing.B, measurementN, seriesKeyN, seriesVa
 
 	for i := 0; i < b.N; i++ {
 		for _, d := range data {
-			if err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil {
+			if _, err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil {
 				b.Fatal(err)
 			}
 		}
@@ -417,7 +460,7 @@ func BenchmarkLogFile_WriteTo(b *testing.B) {
 
 			// Initialize log file with series data.
 			for i := 0; i < seriesN; i++ {
-				if err := f.AddSeriesList(
+				if _, err := f.AddSeriesList(
 					seriesSet,
 					[][]byte{[]byte("cpu")},
 					[]models.Tags{{
diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go
index 31b08be537..43762385cc 100644
--- a/tsdb/index/tsi1/partition.go
+++ b/tsdb/index/tsi1/partition.go
@@ -655,7 +655,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
 
 	// Ensure fileset cannot change during insert.
 	p.mu.RLock()
-	if err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil {
+	// Insert series into log file.
+	if _, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil {
 		p.mu.RUnlock()
 		return err
 	}