Merge pull request #10190 from influxdata/er-tagsets

Reduce allocations in TSI TagSets implementation
pull/10197/head
Edd Robinson 2018-08-10 16:52:23 +01:00 committed by GitHub
commit b7f1097176
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 319 additions and 9 deletions

View File

@ -2548,6 +2548,11 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
maxSeriesN = int(^uint(0) >> 1)
}
// The tag sets require a string for each series key in the set, The series
// file formatted keys need to be parsed into models format. Since they will
// end up as strings we can re-use an intermediate buffer for this process.
var keyBuf []byte
var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration.
for {
se, err := itr.Next()
if err != nil {
@ -2575,14 +2580,15 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
}
_, tags := ParseSeriesKey(key)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tags) {
// NOTE - must not escape this loop iteration.
_, tagsBuf = ParseSeriesKeyInto(key, tagsBuf)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) {
continue
}
var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = MakeTagsKey(dims, tags)
tagsAsKey = MakeTagsKey(dims, tagsBuf)
}
tagSet, ok := tagSets[string(tagsAsKey)]
@ -2595,7 +2601,9 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
}
// Associate the series and filter with the Tagset.
tagSet.AddFilter(string(models.MakeKey(name, tags)), se.Expr)
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
tagSet.AddFilter(string(keyBuf), se.Expr)
keyBuf = keyBuf[:0]
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet

View File

@ -1,17 +1,20 @@
package tsdb_test
import (
"compress/gzip"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
@ -313,7 +316,6 @@ func MustNewIndex(index string) *Index {
opts.IndexVersion = index
rootPath, err := ioutil.TempDir("", "influxdb-tsdb")
fmt.Println(rootPath)
if err != nil {
panic(err)
}
@ -416,3 +418,244 @@ func (i *Index) Close() error {
//return os.RemoveAll(i.rootPath)
return nil
}
// This benchmark compares the TagSets implementation across index types.
//
// In the case of the TSI index, TagSets has to merge results across all several
// index partitions.
//
// Typical results on an i7 laptop.
//
// BenchmarkIndexSet_TagSets/1M_series/inmem-8 100 10430732 ns/op 3556728 B/op 51 allocs/op
// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op
func BenchmarkIndexSet_TagSets(b *testing.B) {
// Read line-protocol and coerce into tsdb format.
keys := make([][]byte, 0, 1e6)
names := make([][]byte, 0, 1e6)
tags := make([]models.Tags, 0, 1e6)
// 1M series generated with:
// $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1
fd, err := os.Open("testdata/line-protocol-1M.txt.gz")
if err != nil {
b.Fatal(err)
}
gzr, err := gzip.NewReader(fd)
if err != nil {
fd.Close()
b.Fatal(err)
}
data, err := ioutil.ReadAll(gzr)
if err != nil {
b.Fatal(err)
}
if err := fd.Close(); err != nil {
b.Fatal(err)
}
points, err := models.ParsePoints(data)
if err != nil {
b.Fatal(err)
}
for _, pt := range points {
keys = append(keys, pt.Key())
names = append(names, pt.Name())
tags = append(tags, pt.Tags())
}
// setup writes all of the above points to the index.
setup := func(idx *Index) {
batchSize := 10000
for j := 0; j < 1; j++ {
for i := 0; i < len(keys); i += batchSize {
k := keys[i : i+batchSize]
n := names[i : i+batchSize]
t := tags[i : i+batchSize]
if err := idx.CreateSeriesListIfNotExists(k, n, t); err != nil {
b.Fatal(err)
}
}
}
}
// TODO(edd): refactor how we call into tag sets in the tsdb package.
type indexTagSets interface {
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
}
var errResult error
// This benchmark will merge eight bitsets each containing ~10,000 series IDs.
b.Run("1M series", func(b *testing.B) {
b.ReportAllocs()
for _, indexType := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(indexType)
setup(idx)
name := []byte("m4")
opt := query.IteratorOptions{Condition: influxql.MustParseExpr(`"tag5"::tag = 'value0'`)}
indexSet := tsdb.IndexSet{
SeriesFile: idx.sfile,
Indexes: []tsdb.Index{idx.Index},
} // For TSI implementation
var ts func() ([]*query.TagSet, error)
// TODO(edd): this is somewhat awkward. We should unify this difference somewhere higher
// up than the engine. I don't want to open an engine do a benchmark on
// different index implementations.
if indexType == "inmem" {
ts = func() ([]*query.TagSet, error) {
return idx.Index.(indexTagSets).TagSets(name, opt)
}
} else {
ts = func() ([]*query.TagSet, error) {
return indexSet.TagSets(idx.sfile, name, opt)
}
}
b.Run(indexType, func(b *testing.B) {
for i := 0; i < b.N; i++ {
// Will call TagSets on the appropriate implementation.
_, errResult = ts()
if errResult != nil {
b.Fatal(err)
}
}
})
if err := idx.Close(); err != nil {
b.Fatal(err)
}
}
})
}
// This benchmark concurrently writes series to the index and fetches cached bitsets.
// The idea is to emphasize the performance difference when bitset caching is on and off.
//
// Typical results for an i7 laptop
//
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000-8 1 5866592461 ns/op 2499768464 B/op 23964591 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000-8 1 30059490078 ns/op 32582973824 B/op 96705317 allocs/op
func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
// Read line-protocol and coerce into tsdb format.
keys := make([][]byte, 0, 1e6)
names := make([][]byte, 0, 1e6)
tags := make([]models.Tags, 0, 1e6)
// 1M series generated with:
// $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1
fd, err := os.Open("testdata/line-protocol-1M.txt.gz")
if err != nil {
b.Fatal(err)
}
gzr, err := gzip.NewReader(fd)
if err != nil {
fd.Close()
b.Fatal(err)
}
data, err := ioutil.ReadAll(gzr)
if err != nil {
b.Fatal(err)
}
if err := fd.Close(); err != nil {
b.Fatal(err)
}
points, err := models.ParsePoints(data)
if err != nil {
b.Fatal(err)
}
for _, pt := range points {
keys = append(keys, pt.Key())
names = append(names, pt.Name())
tags = append(tags, pt.Tags())
}
runBenchmark := func(b *testing.B, index string, queryN int) {
idx := MustOpenNewIndex(index)
var wg sync.WaitGroup
begin := make(chan struct{})
// Run concurrent iterator...
runIter := func() {
keys := [][]string{
{"m0", "tag2", "value4"},
{"m1", "tag3", "value5"},
{"m2", "tag4", "value6"},
{"m3", "tag0", "value8"},
{"m4", "tag5", "value0"},
}
<-begin // Wait for writes to land
for i := 0; i < queryN/5; i++ {
for _, key := range keys {
itr, err := idx.TagValueSeriesIDIterator([]byte(key[0]), []byte(key[1]), []byte(key[2]))
if err != nil {
b.Fatal(err)
}
if itr == nil {
panic("should not happen")
}
if err := itr.Close(); err != nil {
b.Fatal(err)
}
}
}
}
batchSize := 10000
wg.Add(1)
go func() { defer wg.Done(); runIter() }()
var once sync.Once
for j := 0; j < b.N; j++ {
for i := 0; i < len(keys); i += batchSize {
k := keys[i : i+batchSize]
n := names[i : i+batchSize]
t := tags[i : i+batchSize]
if err := idx.CreateSeriesListIfNotExists(k, n, t); err != nil {
b.Fatal(err)
}
once.Do(func() { close(begin) })
}
// Wait for queries to finish
wg.Wait()
// Reset the index...
b.StopTimer()
if err := idx.Close(); err != nil {
b.Fatal(err)
}
// Re-open everything
idx = MustOpenNewIndex(index)
wg.Add(1)
begin = make(chan struct{})
once = sync.Once{}
go func() { defer wg.Done(); runIter() }()
b.StartTimer()
}
}
queries := []int{1e5}
for _, indexType := range tsdb.RegisteredIndexes() {
b.Run(indexType, func(b *testing.B) {
for _, queryN := range queries {
b.Run(fmt.Sprintf("queries %d", queryN), func(b *testing.B) {
runBenchmark(b, indexType, queryN)
})
}
})
}
}

View File

@ -355,18 +355,39 @@ func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) {
// ParseSeriesKey extracts the name & tags from a series key.
func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) {
return parseSeriesKey(data, nil)
}
// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into
// dstTags, which is then returened.
//
// The returned dstTags may have a different length and capacity.
func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {
return parseSeriesKey(data, dstTags)
}
// parseSeriesKey extracts the name and tags from data, attempting to re-use the
// provided tags value rather than allocating. The returned tags may have a
// different length and capacity to those provided.
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
name, data = ReadSeriesKeyMeasurement(data)
tagN, data := ReadSeriesKeyTagN(data)
tags = make(models.Tags, tagN)
dst = dst[:cap(dst)] // Grow dst to use full capacity
if got, want := len(dst), tagN; got < want {
dst = append(dst, make(models.Tags, want-got)...)
}
dst = dst[:tagN]
for i := 0; i < tagN; i++ {
var key, value []byte
key, value, data = ReadSeriesKeyTag(data)
tags[i] = models.Tag{Key: key, Value: value}
dst[i].Key, dst[i].Value = key, value
}
return name, tags
return name, dst
}
func CompareSeriesKeys(a, b []byte) int {

View File

@ -1,6 +1,7 @@
package tsdb_test
import (
"bytes"
"fmt"
"io/ioutil"
"os"
@ -11,6 +12,43 @@ import (
"github.com/influxdata/influxdb/tsdb"
)
func TestParseSeriesKeyInto(t *testing.T) {
name := []byte("cpu")
tags := models.NewTags(map[string]string{"region": "east", "server": "a"})
key := tsdb.AppendSeriesKey(nil, name, tags)
dst := make(models.Tags, 0)
gotName, gotTags := tsdb.ParseSeriesKeyInto(key, dst)
if !bytes.Equal(gotName, name) {
t.Fatalf("got %q, expected %q", gotName, name)
}
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
dst = make(models.Tags, 0, 5)
_, gotTags = tsdb.ParseSeriesKeyInto(key, dst)
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := cap(gotTags), 5; got != exp {
t.Fatalf("got tags capacity %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
dst = make(models.Tags, 1)
_, gotTags = tsdb.ParseSeriesKeyInto(key, dst)
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
}
// Ensure series file contains the correct set of series.
func TestSeriesFile_Series(t *testing.T) {
sfile := MustOpenSeriesFile()

BIN
tsdb/testdata/line-protocol-1M.txt.gz vendored Normal file

Binary file not shown.