Use models series key for partition allocation
There are two series key formats: the `models` package format, which is also line-protocol format, and the `tsdb` package format, which is used by the series file when serialising series keys. When writing to a series, rather than taking a `models` format key from the `coordinator` package and then converting it to a `tsdb` package format, it would be cheaper to keep the key in the `models` format before hashing it to determine which partition the key lives in.pull/9315/head
parent
a4bef3a4bc
commit
7f244cb29f
|
@ -143,7 +143,6 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
|
|||
seriesIDSet := tsdb.NewSeriesIDSet()
|
||||
others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN)
|
||||
for _, p := range i.partitions {
|
||||
fmt.Printf("Partition %s: bitset to merge is %v\n", p.id, p.seriesSet)
|
||||
others = append(others, p.seriesSet)
|
||||
}
|
||||
seriesIDSet.Merge(others...)
|
||||
|
@ -468,7 +467,7 @@ func (i *Index) DropMeasurement(name []byte) error {
|
|||
}
|
||||
|
||||
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
|
||||
func (i *Index) CreateSeriesListIfNotExists(_ [][]byte, names [][]byte, tagsSlice []models.Tags) error {
|
||||
func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) error {
|
||||
// All slices must be of equal length.
|
||||
if len(names) != len(tagsSlice) {
|
||||
return errors.New("names/tags length mismatch in index")
|
||||
|
@ -480,13 +479,10 @@ func (i *Index) CreateSeriesListIfNotExists(_ [][]byte, names [][]byte, tagsSlic
|
|||
pTags := make([][]models.Tags, i.PartitionN)
|
||||
|
||||
// Determine partition for series using each series key.
|
||||
buf := make([]byte, 2048)
|
||||
for k := range names {
|
||||
buf = tsdb.AppendSeriesKey(buf[:0], names[k], tagsSlice[k])
|
||||
|
||||
pidx := i.partitionIdx(buf)
|
||||
pNames[pidx] = append(pNames[pidx], names[k])
|
||||
pTags[pidx] = append(pTags[pidx], tagsSlice[k])
|
||||
for ki, key := range keys {
|
||||
pidx := i.partitionIdx(key)
|
||||
pNames[pidx] = append(pNames[pidx], names[ki])
|
||||
pTags[pidx] = append(pTags[pidx], tagsSlice[ki])
|
||||
}
|
||||
|
||||
// Process each subset of series on each partition.
|
||||
|
@ -530,11 +526,10 @@ func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
|
|||
// DropSeries drops the provided series from the index.
|
||||
func (i *Index) DropSeries(key []byte, ts int64) error {
|
||||
// Extract measurement name.
|
||||
name, tags := models.ParseKeyBytes(key)
|
||||
partitionKey := tsdb.AppendSeriesKey(nil, name, tags)
|
||||
name, _ := models.ParseKeyBytes(key)
|
||||
|
||||
// Remove from partition.
|
||||
if err := i.partition(partitionKey).DropSeries(key, ts); err != nil {
|
||||
if err := i.partition(key).DropSeries(key, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue