Merge pull request #8431 from influxdata/sgc-tagsets

improvements to inmem/Measurement.TagSets API
pull/8433/head
Stuart Carnie 2017-05-26 10:29:56 -07:00 committed by GitHub
commit 3b08f65a20
4 changed files with 175 additions and 38 deletions

View File

@ -308,6 +308,13 @@ func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*
return nil, err
}
var dims []string
if len(opt.Dimensions) > 0 {
dims = make([]string, len(opt.Dimensions))
copy(dims, opt.Dimensions)
sort.Strings(dims)
}
m.mu.RLock()
// For every series, get the tag values for the requested tag keys i.e. dimensions. This is the
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
@ -330,30 +337,24 @@ func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*
if !s.Assigned(shardID) {
continue
}
tags := make(map[string]string, len(opt.Dimensions))
// Build the TagSet for this series.
for _, dim := range opt.Dimensions {
tags[dim] = s.GetTagString(dim)
var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags())
}
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
// as a set.
tagsAsKey := tsdb.MarshalTags(tags)
tagSet, ok := tagSets[string(tagsAsKey)]
if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &influxql.TagSet{
Tags: tags,
Tags: nil,
Key: tagsAsKey,
}
tagSets[string(tagsAsKey)] = tagSet
}
// Associate the series and filter with the Tagset.
tagSet.AddFilter(m.seriesByID[id].Key, filters[id])
seriesN++
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
}
// Release the lock while we sort all the tags
m.mu.RUnlock()

View File

@ -221,35 +221,38 @@ func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
}
/*
func BenchmarkCreateSeriesIndex_1K(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3))
}
func BenchmarkCreateSeriesIndex_100K(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5))
}
func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5))
}
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
idxs := make([]*inmem.DatabaseIndex, 0, b.N)
for i := 0; i < b.N; i++ {
index, err := inmem.NewDatabaseIndex(fmt.Sprintf("db%d", i))
if err != nil {
b.Fatal(err)
}
idxs = append(idxs, index)
func benchmarkTagSets(b *testing.B, n int, opt influxql.IteratorOptions) {
m := inmem.NewMeasurement("m")
for i := 0; i < n; i++ {
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
s := inmem.NewSeries([]byte(fmt.Sprintf("m,tag1=value1,tag2=value2")), models.NewTags(tags))
s.ID = uint64(i)
s.AssignShard(0)
m.AddSeries(s)
}
// warm caches
m.TagSets(0, opt)
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
idx := idxs[n]
for _, s := range series {
idx.CreateSeriesIndexIfNotExists(s.Measurement, s.Series, false)
}
for i := 0; i < b.N; i++ {
m.TagSets(0, opt)
}
}
*/
func BenchmarkMeasurement_TagSetsNoDimensions_1000(b *testing.B) {
benchmarkTagSets(b, 1000, influxql.IteratorOptions{})
}
func BenchmarkMeasurement_TagSetsDimensions_1000(b *testing.B) {
benchmarkTagSets(b, 1000, influxql.IteratorOptions{Dimensions: []string{"tag1", "tag2"}})
}
func BenchmarkMeasurement_TagSetsNoDimensions_100000(b *testing.B) {
benchmarkTagSets(b, 100000, influxql.IteratorOptions{})
}
func BenchmarkMeasurement_TagSetsDimensions_100000(b *testing.B) {
benchmarkTagSets(b, 100000, influxql.IteratorOptions{Dimensions: []string{"tag1", "tag2"}})
}

View File

@ -44,6 +44,60 @@ func MarshalTags(tags map[string]string) []byte {
return b
}
// MakeTagsKey converts a tag set to bytes for use as a lookup key.
func MakeTagsKey(keys []string, tags models.Tags) []byte {
// precondition: keys is sorted
// precondition: models.Tags is sorted
// Empty maps marshal to empty bytes.
if len(keys) == 0 || len(tags) == 0 {
return nil
}
sel := make([]int, 0, len(keys))
sz := 0
i, j := 0, 0
for i < len(keys) && j < len(tags) {
if keys[i] < string(tags[j].Key) {
i++
} else if keys[i] > string(tags[j].Key) {
j++
} else {
sel = append(sel, j)
sz += len(keys[i]) + len(tags[j].Value)
i++
j++
}
}
if len(sel) == 0 {
// no tags matched the requested keys
return nil
}
sz += (len(sel) * 2) - 1 // selected tags, add separators
// Generate marshaled bytes.
b := make([]byte, sz)
buf := b
for _, k := range sel {
copy(buf, tags[k].Key)
buf[len(tags[k].Key)] = '|'
buf = buf[len(tags[k].Key)+1:]
}
for i, k := range sel {
copy(buf, tags[k].Value)
if i < len(sel)-1 {
buf[len(tags[k].Value)] = '|'
buf = buf[len(tags[k].Value)+1:]
}
}
return b
}
// MeasurementFromSeriesKey returns the name of the measurement from a key that
// contains a measurement name.
func MeasurementFromSeriesKey(key []byte) []byte {

View File

@ -61,6 +61,85 @@ func benchmarkMarshalTags(b *testing.B, keyN int) {
}
}
// Ensure tags can be marshaled into a byte slice.
func TestMakeTagsKey(t *testing.T) {
for i, tt := range []struct {
keys []string
tags models.Tags
result []byte
}{
{
keys: nil,
tags: nil,
result: nil,
},
{
keys: []string{"foo"},
tags: models.NewTags(map[string]string{"foo": "bar"}),
result: []byte(`foo|bar`),
},
{
keys: []string{"foo"},
tags: models.NewTags(map[string]string{"baz": "battttt"}),
result: []byte(``),
},
{
keys: []string{"baz", "foo"},
tags: models.NewTags(map[string]string{"baz": "battttt"}),
result: []byte(`baz|battttt`),
},
{
keys: []string{"baz", "foo", "zzz"},
tags: models.NewTags(map[string]string{"foo": "bar"}),
result: []byte(`foo|bar`),
},
{
keys: []string{"baz", "foo"},
tags: models.NewTags(map[string]string{"foo": "bar", "baz": "battttt"}),
result: []byte(`baz|foo|battttt|bar`),
},
{
keys: []string{"baz"},
tags: models.NewTags(map[string]string{"baz": "battttt", "foo": "bar"}),
result: []byte(`baz|battttt`),
},
} {
result := tsdb.MakeTagsKey(tt.keys, tt.tags)
if !bytes.Equal(result, tt.result) {
t.Fatalf("%d. unexpected result: exp=%s, got=%s", i, tt.result, result)
}
}
}
func BenchmarkMakeTagsKey_KeyN1(b *testing.B) { benchmarkMakeTagsKey(b, 1) }
func BenchmarkMakeTagsKey_KeyN3(b *testing.B) { benchmarkMakeTagsKey(b, 3) }
func BenchmarkMakeTagsKey_KeyN5(b *testing.B) { benchmarkMakeTagsKey(b, 5) }
func BenchmarkMakeTagsKey_KeyN10(b *testing.B) { benchmarkMakeTagsKey(b, 10) }
func makeTagsAndKeys(keyN int) ([]string, models.Tags) {
const keySize, valueSize = 8, 15
// Generate tag map.
keys := make([]string, keyN)
tags := make(map[string]string)
for i := 0; i < keyN; i++ {
keys[i] = fmt.Sprintf("%0*d", keySize, i)
tags[keys[i]] = fmt.Sprintf("%0*d", valueSize, i)
}
return keys, models.NewTags(tags)
}
func benchmarkMakeTagsKey(b *testing.B, keyN int) {
keys, tags := makeTagsAndKeys(keyN)
// Unmarshal map into byte slice.
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tsdb.MakeTagsKey(keys, tags)
}
}
type TestSeries struct {
Measurement string
Series *inmem.Series