Return created series ids from LogFile insertion

pull/10234/head
Edd Robinson 2018-08-07 13:38:13 +01:00
parent 52b5640a4a
commit 065d47e4f2
3 changed files with 74 additions and 25 deletions

View File

@ -509,10 +509,10 @@ func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
}
// AddSeriesList adds a list of series to the log file in bulk.
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) error {
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
if err != nil {
return err
return nil, err
}
var writeRequired bool
@ -521,6 +521,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
for i := range names {
if seriesSet.ContainsNoLock(seriesIDs[i]) {
// We don't need to allocate anything for this series.
seriesIDs[i] = 0
continue
}
writeRequired = true
@ -530,7 +531,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
// Exit if all series already exist.
if !writeRequired {
return nil
return seriesIDs, nil
}
f.mu.Lock()
@ -539,21 +540,25 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
seriesSet.Lock()
defer seriesSet.Unlock()
for i := range entries {
for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
entry := &entries[i]
if seriesSet.ContainsNoLock(entry.SeriesID) {
// We don't need to allocate anything for this series.
seriesIDs[i] = 0
continue
}
if err := f.appendEntry(entry); err != nil {
return err
return nil, err
}
f.execEntry(entry)
seriesSet.AddNoLock(entry.SeriesID)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
if err := f.FlushAndSync(); err != nil {
return nil, err
}
return seriesIDs, nil
}
// DeleteSeriesID adds a tombstone for a series id.

View File

@ -7,12 +7,15 @@ import (
"math/rand"
"os"
"path/filepath"
"reflect"
"regexp"
"runtime/pprof"
"sort"
"testing"
"time"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bloom"
"github.com/influxdata/influxdb/tsdb"
@ -29,18 +32,58 @@ func TestLogFile_AddSeriesList(t *testing.T) {
seriesSet := tsdb.NewSeriesIDSet()
// Add test data.
if err := f.AddSeriesList(seriesSet, [][]byte{
[]byte("mem"),
[]byte("cpu"),
[]byte("cpu"),
}, []models.Tags{
{{Key: []byte("host"), Value: []byte("serverA")}},
{{Key: []byte("region"), Value: []byte("us-east")}},
{{Key: []byte("region"), Value: []byte("us-west")}},
}); err != nil {
ids, err := f.AddSeriesList(seriesSet,
slices.StringsToBytes("cpu", "mem"),
[]models.Tags{
models.NewTags(map[string]string{"region": "us-east"}),
models.NewTags(map[string]string{"host": "serverA"}),
},
)
if err != nil {
t.Fatal(err)
}
// Returned series ids should match those in the seriesSet.
other := tsdb.NewSeriesIDSet(ids...)
if !other.Equals(seriesSet) {
t.Fatalf("got series ids %s, expected %s", other, seriesSet)
}
// Add the same series again with a new one.
ids, err = f.AddSeriesList(seriesSet,
slices.StringsToBytes("cpu", "mem"),
[]models.Tags{
models.NewTags(map[string]string{"region": "us-west"}),
models.NewTags(map[string]string{"host": "serverA"}),
},
)
if err != nil {
t.Fatal(err)
}
if got, exp := len(ids), 2; got != exp {
t.Fatalf("got %d series ids, expected %d", got, exp)
} else if got := ids[0]; got == 0 {
t.Error("series id was 0, expected it not to be")
} else if got := ids[1]; got != 0 {
t.Errorf("got series id %d, expected 0", got)
}
// Add only the same series IDs.
ids, err = f.AddSeriesList(seriesSet,
slices.StringsToBytes("cpu", "mem"),
[]models.Tags{
models.NewTags(map[string]string{"region": "us-west"}),
models.NewTags(map[string]string{"host": "serverA"}),
},
)
if got, exp := ids, make([]uint64, 2); !reflect.DeepEqual(got, exp) {
t.Fatalf("got ids %v, expected %v", got, exp)
}
// Verify data.
itr := f.MeasurementIterator()
if e := itr.Next(); e == nil || string(e.Name()) != "cpu" {
@ -82,7 +125,7 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) {
tv := fmt.Sprintf("server-%d", rand.Intn(50)) // Encourage adding duplicate series.
tvm[tv] = struct{}{}
if err := f.AddSeriesList(seriesSet, [][]byte{
if _, err := f.AddSeriesList(seriesSet, [][]byte{
[]byte("mem"),
[]byte("cpu"),
}, []models.Tags{
@ -133,7 +176,7 @@ func TestLogFile_DeleteMeasurement(t *testing.T) {
seriesSet := tsdb.NewSeriesIDSet()
// Add test data.
if err := f.AddSeriesList(seriesSet, [][]byte{
if _, err := f.AddSeriesList(seriesSet, [][]byte{
[]byte("mem"),
[]byte("cpu"),
[]byte("cpu"),
@ -172,7 +215,7 @@ func TestLogFile_Open(t *testing.T) {
defer f.Close()
// Add test data & close.
if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
t.Fatal(err)
} else if err := f.LogFile.Close(); err != nil {
t.Fatal(err)
@ -200,7 +243,7 @@ func TestLogFile_Open(t *testing.T) {
}
// Add more data & reopen.
if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil {
if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil {
t.Fatal(err)
} else if err := f.Reopen(); err != nil {
t.Fatal(err)
@ -232,7 +275,7 @@ func TestLogFile_Open(t *testing.T) {
defer f.Close()
// Add test data & close.
if err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
t.Fatal(err)
} else if err := f.LogFile.Close(); err != nil {
t.Fatal(err)
@ -313,7 +356,7 @@ func CreateLogFile(sfile *tsdb.SeriesFile, series []Series) (*LogFile, error) {
f := MustOpenLogFile(sfile)
seriesSet := tsdb.NewSeriesIDSet()
for _, serie := range series {
if err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil {
if _, err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil {
return nil, err
}
}
@ -338,7 +381,7 @@ func GenerateLogFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) (*L
value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN)))
tags = append(tags, models.NewTag(key, value))
}
if err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil {
if _, err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil {
return nil, err
}
}
@ -385,7 +428,7 @@ func benchmarkLogFile_AddSeries(b *testing.B, measurementN, seriesKeyN, seriesVa
for i := 0; i < b.N; i++ {
for _, d := range data {
if err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil {
if _, err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil {
b.Fatal(err)
}
}
@ -417,7 +460,7 @@ func BenchmarkLogFile_WriteTo(b *testing.B) {
// Initialize log file with series data.
for i := 0; i < seriesN; i++ {
if err := f.AddSeriesList(
if _, err := f.AddSeriesList(
seriesSet,
[][]byte{[]byte("cpu")},
[]models.Tags{{

View File

@ -655,7 +655,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
// Ensure fileset cannot change during insert.
p.mu.RLock()
if err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil {
// Insert series into log file.
if _, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil {
p.mu.RUnlock()
return err
}