From 7f244cb29fd22e5c5d1f6fd4e19a79dec5b10ddb Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 11 Jan 2018 23:07:35 +0000 Subject: [PATCH] Use models series key for partition allocation There are two series key formats: the `models` package format, which is also line-protocol format, and the `tsdb` package format, which is used by the series file when serialising series keys. When writing to a series, rather than taking a `models` format key from the `coordinator` package and then converting it to a `tsdb` package format, it would be cheaper to keep the key in the `models` format before hashing it to determine which partition the key lives in. --- tsdb/index/tsi1/index.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 52e9eb609a..a86f301cd3 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -143,7 +143,6 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet { seriesIDSet := tsdb.NewSeriesIDSet() others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN) for _, p := range i.partitions { - fmt.Printf("Partition %s: bitset to merge is %v\n", p.id, p.seriesSet) others = append(others, p.seriesSet) } seriesIDSet.Merge(others...) @@ -468,7 +467,7 @@ func (i *Index) DropMeasurement(name []byte) error { } // CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk. -func (i *Index) CreateSeriesListIfNotExists(_ [][]byte, names [][]byte, tagsSlice []models.Tags) error { +func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) error { // All slices must be of equal length. if len(names) != len(tagsSlice) { return errors.New("names/tags length mismatch in index") @@ -480,13 +479,10 @@ func (i *Index) CreateSeriesListIfNotExists(_ [][]byte, names [][]byte, tagsSlic pTags := make([][]models.Tags, i.PartitionN) // Determine partition for series using each series key. - buf := make([]byte, 2048) - for k := range names { - buf = tsdb.AppendSeriesKey(buf[:0], names[k], tagsSlice[k]) - - pidx := i.partitionIdx(buf) - pNames[pidx] = append(pNames[pidx], names[k]) - pTags[pidx] = append(pTags[pidx], tagsSlice[k]) + for ki, key := range keys { + pidx := i.partitionIdx(key) + pNames[pidx] = append(pNames[pidx], names[ki]) + pTags[pidx] = append(pTags[pidx], tagsSlice[ki]) } // Process each subset of series on each partition. @@ -530,11 +526,10 @@ func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error { // DropSeries drops the provided series from the index. func (i *Index) DropSeries(key []byte, ts int64) error { // Extract measurement name. - name, tags := models.ParseKeyBytes(key) - partitionKey := tsdb.AppendSeriesKey(nil, name, tags) + name, _ := models.ParseKeyBytes(key) // Remove from partition. - if err := i.partition(partitionKey).DropSeries(key, ts); err != nil { + if err := i.partition(key).DropSeries(key, ts); err != nil { return err }