Modify arguments to reduce allocations
parent
c2bbc18e4b
commit
2d59fb788c
|
@ -0,0 +1,62 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/tsdb/index/tsi1"
|
||||
)
|
||||
|
||||
// File is a mock implementation of a tsi1.File.
|
||||
type File struct {
|
||||
Pathf func() string
|
||||
Measurementf func(name []byte) tsi1.MeasurementElem
|
||||
MeasurementIteratorf func() tsi1.MeasurementIterator
|
||||
HasSeriesf func(name []byte, tags models.Tags) (exists, tombstoned bool)
|
||||
Seriesf func(name []byte, tags models.Tags) tsi1.SeriesElem
|
||||
SeriesNf func() uint64
|
||||
TagKeyf func(name, key []byte) tsi1.TagKeyElem
|
||||
TagKeyIteratorf func(name []byte) tsi1.TagKeyIterator
|
||||
TagValuef func(name, key, value []byte) tsi1.TagValueElem
|
||||
TagValueIteratorf func(name, key []byte) tsi1.TagValueIterator
|
||||
SeriesIteratorf func() tsi1.SeriesIterator
|
||||
MeasurementSeriesIteratorf func(name []byte) tsi1.SeriesIterator
|
||||
TagKeySeriesIteratorf func(name, key []byte) tsi1.SeriesIterator
|
||||
TagValueSeriesIteratorf func(name, key, value []byte) tsi1.SeriesIterator
|
||||
MergeSeriesSketchesf func(s, t estimator.Sketch) error
|
||||
MergeMeasurementsSketchesf func(s, t estimator.Sketch) error
|
||||
Retainf func()
|
||||
Releasef func()
|
||||
}
|
||||
|
||||
func (f *File) Path() string { return f.Pathf() }
|
||||
func (f *File) Measurement(name []byte) tsi1.MeasurementElem { return f.Measurementf(name) }
|
||||
func (f *File) MeasurementIterator() tsi1.MeasurementIterator { return f.MeasurementIteratorf() }
|
||||
func (f *File) HasSeries(name []byte, tags models.Tags) (exists, tombstoned bool) {
|
||||
return f.HasSeriesf(name, tags)
|
||||
}
|
||||
func (f *File) Series(name []byte, tags models.Tags) tsi1.SeriesElem { return f.Seriesf(name, tags) }
|
||||
func (f *File) SeriesN() uint64 { return f.SeriesNf() }
|
||||
func (f *File) TagKey(name, key []byte) tsi1.TagKeyElem { return f.TagKeyf(name, key) }
|
||||
func (f *File) TagKeyIterator(name []byte) tsi1.TagKeyIterator { return f.TagKeyIteratorf(name) }
|
||||
func (f *File) TagValue(name, key, value []byte) tsi1.TagValueElem {
|
||||
return f.TagValuef(name, key, value)
|
||||
}
|
||||
func (f *File) TagValueIterator(name, key []byte) tsi1.TagValueIterator {
|
||||
return f.TagValueIteratorf(name, key)
|
||||
}
|
||||
func (f *File) SeriesIterator() tsi1.SeriesIterator { return f.SeriesIteratorf() }
|
||||
func (f *File) MeasurementSeriesIterator(name []byte) tsi1.SeriesIterator {
|
||||
return f.MeasurementSeriesIteratorf(name)
|
||||
}
|
||||
func (f *File) TagKeySeriesIterator(name, key []byte) tsi1.SeriesIterator {
|
||||
return f.TagKeySeriesIteratorf(name, key)
|
||||
}
|
||||
func (f *File) TagValueSeriesIterator(name, key, value []byte) tsi1.SeriesIterator {
|
||||
return f.TagValueSeriesIteratorf(name, key, value)
|
||||
}
|
||||
func (f *File) MergeSeriesSketches(s, t estimator.Sketch) error { return f.MergeSeriesSketchesf(s, t) }
|
||||
func (f *File) MergeMeasurementsSketches(s, t estimator.Sketch) error {
|
||||
return f.MergeMeasurementsSketchesf(s, t)
|
||||
}
|
||||
func (f *File) Retain() { f.Retainf() }
|
||||
func (f *File) Release() { f.Releasef() }
|
|
@ -482,20 +482,16 @@ func (fs FileSet) HasSeries(name []byte, tags models.Tags) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// FilterNamesTags filters out any series which already exist.
|
||||
// FilterNamesTags filters out any series which already exist. It modifies the
|
||||
// provided slices of names and tags.
|
||||
func (fs FileSet) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) {
|
||||
n := len(names)
|
||||
newNames := make([][]byte, 0, n)
|
||||
newTagsSlice := make([]models.Tags, 0, n)
|
||||
|
||||
for j := 0; j < n; j++ {
|
||||
if fs.HasSeries(names[j], tagsSlice[j]) {
|
||||
continue
|
||||
newNames, newTagsSlice := names[:0], tagsSlice[:0]
|
||||
for i := 0; i < len(names); i++ {
|
||||
if !fs.HasSeries(names[i], tagsSlice[i]) {
|
||||
newNames = append(newNames, names[i])
|
||||
newTagsSlice = append(newTagsSlice, tagsSlice[i])
|
||||
}
|
||||
newNames = append(newNames, names[j])
|
||||
newTagsSlice = append(newTagsSlice, tagsSlice[j])
|
||||
}
|
||||
|
||||
return newNames, newTagsSlice
|
||||
}
|
||||
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
package tsi1_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb/index/internal"
|
||||
"github.com/influxdata/influxdb/tsdb/index/tsi1"
|
||||
)
|
||||
|
||||
// Ensure fileset can return an iterator over all series in the index.
|
||||
|
@ -263,3 +267,123 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileSet_FilterNamesTags(t *testing.T) {
|
||||
var mf internal.File
|
||||
fs := tsi1.FileSet{&mf}
|
||||
|
||||
var (
|
||||
names [][]byte
|
||||
tags []models.Tags
|
||||
)
|
||||
|
||||
reset := func() {
|
||||
names = [][]byte{[]byte("m1"), []byte("m2"), []byte("m3"), []byte("m4")}
|
||||
tags = []models.Tags{
|
||||
models.NewTags(map[string]string{"host": "server-1"}),
|
||||
models.NewTags(map[string]string{"host": "server-2"}),
|
||||
models.NewTags(map[string]string{"host": "server-3"}),
|
||||
models.NewTags(map[string]string{"host": "server-3"}),
|
||||
}
|
||||
}
|
||||
|
||||
// Filter out first name/tags in arguments.
|
||||
reset()
|
||||
mf.HasSeriesf = func(name []byte, tags models.Tags) (bool, bool) {
|
||||
return string(name) == "m1" && tags[0].String() == "{host server-1}", false
|
||||
}
|
||||
|
||||
gotNames, gotTags := fs.FilterNamesTags(names, tags)
|
||||
reset()
|
||||
if got, exp := gotNames, names[1:]; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
} else if got, exp := gotTags, tags[1:]; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
|
||||
// Filter out middle name/tags in arguments.
|
||||
reset()
|
||||
mf.HasSeriesf = func(name []byte, tags models.Tags) (bool, bool) {
|
||||
return string(name) == "m3" && tags[0].String() == "{host server-3}", false
|
||||
}
|
||||
|
||||
gotNames, gotTags = fs.FilterNamesTags(names, tags)
|
||||
reset()
|
||||
if got, exp := gotNames, append(names[:2], names[3]); !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
} else if got, exp := gotTags, append(tags[:2], tags[3]); !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
|
||||
// Filter out last name/tags in arguments.
|
||||
reset()
|
||||
mf.HasSeriesf = func(name []byte, tags models.Tags) (bool, bool) {
|
||||
return string(name) == "m4" && tags[0].String() == "{host server-3}", false
|
||||
}
|
||||
|
||||
gotNames, gotTags = fs.FilterNamesTags(names, tags)
|
||||
reset()
|
||||
if got, exp := gotNames, names[:3]; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
} else if got, exp := gotTags, tags[:3]; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
byteSliceResult [][]byte
|
||||
tagsSliceResult []models.Tags
|
||||
)
|
||||
|
||||
func BenchmarkFileset_FilterNamesTags(b *testing.B) {
|
||||
idx := MustOpenIndex()
|
||||
defer idx.Close()
|
||||
|
||||
allNames := make([][]byte, 0, 2000*1000)
|
||||
allTags := make([]models.Tags, 0, 2000*1000)
|
||||
|
||||
for i := 0; i < 2000; i++ {
|
||||
for j := 0; j < 1000; j++ {
|
||||
name := []byte(fmt.Sprintf("measurement-%d", i))
|
||||
tags := models.NewTags(map[string]string{"host": fmt.Sprintf("server-%d", j)})
|
||||
allNames = append(allNames, name)
|
||||
allTags = append(allTags, tags)
|
||||
}
|
||||
}
|
||||
|
||||
if err := idx.CreateSeriesListIfNotExists(nil, allNames, allTags); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
idx.CheckFastCompaction()
|
||||
|
||||
fs := idx.RetainFileSet()
|
||||
defer fs.Release()
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
names := [][]byte{
|
||||
[]byte("foo"),
|
||||
[]byte("measurement-222"), // filtered
|
||||
[]byte("measurement-222"), // kept (tags won't match)
|
||||
[]byte("measurements-1"),
|
||||
[]byte("measurement-900"), // filtered
|
||||
[]byte("measurement-44444"),
|
||||
[]byte("bar"),
|
||||
}
|
||||
|
||||
tags := []models.Tags{
|
||||
nil,
|
||||
models.NewTags(map[string]string{"host": "server-297"}), // filtered
|
||||
models.NewTags(map[string]string{"host": "wrong"}),
|
||||
nil,
|
||||
models.NewTags(map[string]string{"host": "server-1026"}), // filtered
|
||||
models.NewTags(map[string]string{"host": "server-23"}), // kept (measurement won't match)
|
||||
models.NewTags(map[string]string{"host": "zoo"}),
|
||||
}
|
||||
b.StartTimer()
|
||||
byteSliceResult, tagsSliceResult = fs.FilterNamesTags(names, tags)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue