influxdb/tsdb/series_file.go

539 lines
14 KiB
Go
Raw Permalink Normal View History

2017-11-15 23:09:25 +00:00
package tsdb
2017-09-14 15:41:58 +00:00
import (
"bytes"
"encoding/binary"
2018-01-08 16:11:29 +00:00
"errors"
2017-09-14 15:41:58 +00:00
"fmt"
"os"
2017-11-15 23:09:25 +00:00
"path/filepath"
"runtime"
2018-01-09 19:05:37 +00:00
"sort"
"sync"
2017-09-14 15:41:58 +00:00
2018-01-09 19:05:37 +00:00
"github.com/cespare/xxhash"
2017-09-14 15:41:58 +00:00
"github.com/influxdata/influxdb/models"
2018-01-09 19:05:37 +00:00
"github.com/influxdata/influxdb/pkg/binaryutil"
"github.com/influxdata/influxdb/pkg/limiter"
2018-01-02 19:20:03 +00:00
"go.uber.org/zap"
2018-01-09 19:05:37 +00:00
"golang.org/x/sync/errgroup"
2017-09-14 15:41:58 +00:00
)
2018-01-08 16:11:29 +00:00
var (
2018-01-09 19:05:37 +00:00
ErrSeriesFileClosed = errors.New("tsdb: series file closed")
ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")
2018-01-08 16:11:29 +00:00
)
2017-11-30 17:23:03 +00:00
// SeriesIDSize is the size in bytes of a series key ID.
2017-09-26 13:40:26 +00:00
const SeriesIDSize = 8
2017-09-14 15:41:58 +00:00
2018-01-09 19:05:37 +00:00
const (
// SeriesFilePartitionN is the number of partitions a series file is split into.
SeriesFilePartitionN = 8
)
2017-10-02 14:07:11 +00:00
2017-09-14 15:41:58 +00:00
// SeriesFile represents the section of the index that holds series data.
type SeriesFile struct {
2018-01-09 19:05:37 +00:00
path string
partitions []*SeriesPartition
2018-01-02 19:20:03 +00:00
maxSnapshotConcurrency int
refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
2018-01-02 19:20:03 +00:00
Logger *zap.Logger
2017-09-14 15:41:58 +00:00
}
// NewSeriesFile returns a new instance of SeriesFile.
func NewSeriesFile(path string) *SeriesFile {
maxSnapshotConcurrency := runtime.GOMAXPROCS(0)
if maxSnapshotConcurrency < 1 {
maxSnapshotConcurrency = 1
}
2017-09-14 15:41:58 +00:00
return &SeriesFile{
path: path,
maxSnapshotConcurrency: maxSnapshotConcurrency,
Logger: zap.NewNop(),
2017-09-14 15:41:58 +00:00
}
}
func (f *SeriesFile) WithMaxCompactionConcurrency(maxCompactionConcurrency int) {
if maxCompactionConcurrency < 1 {
maxCompactionConcurrency = runtime.GOMAXPROCS(0)
if maxCompactionConcurrency < 1 {
maxCompactionConcurrency = 1
}
}
f.maxSnapshotConcurrency = maxCompactionConcurrency
}
2017-09-14 15:41:58 +00:00
// Open memory maps the data file at the file's path.
func (f *SeriesFile) Open() error {
// Wait for all references to be released and prevent new ones from being acquired.
f.refs.Lock()
defer f.refs.Unlock()
2017-12-29 18:57:30 +00:00
// Create path if it doesn't exist.
if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil {
2017-11-15 23:09:25 +00:00
return err
}
// Limit concurrent series file compactions
compactionLimiter := limiter.NewFixed(f.maxSnapshotConcurrency)
2018-01-09 19:05:37 +00:00
// Open partitions.
f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
for i := 0; i < SeriesFilePartitionN; i++ {
p := NewSeriesPartition(i, f.SeriesPartitionPath(i), compactionLimiter)
2018-01-09 19:05:37 +00:00
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
if err := p.Open(); err != nil {
2019-02-06 08:10:51 +00:00
f.Logger.Error("Unable to open series file",
2018-12-06 14:58:05 +00:00
zap.String("path", f.path),
zap.Int("partition", p.ID()),
zap.Error(err))
2019-02-06 15:27:59 +00:00
f.close()
2017-12-19 17:31:33 +00:00
return err
}
2018-01-09 19:05:37 +00:00
f.partitions = append(f.partitions, p)
2017-09-14 15:41:58 +00:00
}
return nil
}
2019-02-06 08:10:51 +00:00
func (f *SeriesFile) close() (err error) {
2018-01-09 19:05:37 +00:00
for _, p := range f.partitions {
if e := p.Close(); e != nil && err == nil {
2017-12-29 18:57:30 +00:00
err = e
}
2017-09-14 15:41:58 +00:00
}
2017-12-29 18:57:30 +00:00
return err
2017-09-14 15:41:58 +00:00
}
// Close unmaps the data file.
func (f *SeriesFile) Close() (err error) {
2019-02-06 08:10:51 +00:00
f.refs.Lock()
defer f.refs.Unlock()
return f.close()
}
2017-09-14 15:41:58 +00:00
// Path returns the path to the file.
func (f *SeriesFile) Path() string { return f.path }
2018-01-09 19:05:37 +00:00
// SeriesPartitionPath returns the path to a given partition.
func (f *SeriesFile) SeriesPartitionPath(i int) string {
return filepath.Join(f.path, fmt.Sprintf("%02x", i))
}
// Partitions returns all partitions.
func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions }
2017-12-29 18:57:30 +00:00
// Retain adds a reference count to the file. It returns a release func.
func (f *SeriesFile) Retain() func() {
if f != nil {
f.refs.RLock()
// Return the RUnlock func as the release func to be called when done.
return f.refs.RUnlock
}
return nop
}
// EnableCompactions allows compactions to run.
func (f *SeriesFile) EnableCompactions() {
for _, p := range f.partitions {
p.EnableCompactions()
}
}
// DisableCompactions prevents new compactions from running.
func (f *SeriesFile) DisableCompactions() {
for _, p := range f.partitions {
p.DisableCompactions()
}
}
// Wait waits for all Retains to be released.
func (f *SeriesFile) Wait() {
f.refs.Lock()
defer f.refs.Unlock()
}
// FileSize returns the size of all partitions, in bytes.
func (f *SeriesFile) FileSize() (n int64, err error) {
for _, p := range f.partitions {
v, err := p.FileSize()
n += v
if err != nil {
return n, err
}
}
return n, err
}
2018-01-03 19:19:02 +00:00
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The returned ids slice returns IDs for every name+tags, creating new series IDs as needed.
feat: series creation ingress metrics (#20700) After turning this on and testing locally, note the 'seriesCreated' metric "localStore": {"name":"localStore","tags":null,"values":{"pointsWritten":2987,"seriesCreated":58,"valuesWritten":23754}}, "ingress": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"cq","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":4}}, "ingress:1": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"database","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:2": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"httpd","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":46}}, "ingress:3": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"ingress","rp":"monitor"},"values":{"pointsWritten":14,"seriesCreated":14,"valuesWritten":42}}, "ingress:4": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"localStore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:5": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"queryExecutor","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":10}}, "ingress:6": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"runtime","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":30}}, "ingress:7": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"shard","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":22}}, "ingress:8": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"subscriber","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:9": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_cache","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":18}}, "ingress:10": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_engine","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":58}}, "ingress:11": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_filestore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:12": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_wal","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":8}}, "ingress:13": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"write","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":18}}, "ingress:14": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"cpu","rp":"autogen"},"values":{"pointsWritten":1342,"seriesCreated":13,"valuesWritten":13420}}, "ingress:15": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"disk","rp":"autogen"},"values":{"pointsWritten":642,"seriesCreated":6,"valuesWritten":4494}}, "ingress:16": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"diskio","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":2,"valuesWritten":2354}}, "ingress:17": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"mem","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":963}}, "ingress:18": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"processes","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":856}}, "ingress:19": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"swap","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":1,"valuesWritten":642}}, "ingress:20": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"system","rp":"autogen"},"values":{"pointsWritten":321,"seriesCreated":1,"valuesWritten":749}}, Closes: https://github.com/influxdata/influxdb/issues/20613
2021-02-05 18:52:43 +00:00
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, tracker StatsTracker) ([]uint64, error) {
2018-01-09 19:05:37 +00:00
keys := GenerateSeriesKeys(names, tagsSlice)
keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
ids := make([]uint64, len(keys))
2018-01-09 19:05:37 +00:00
var g errgroup.Group
for i := range f.partitions {
p := f.partitions[i]
g.Go(func() error {
feat: series creation ingress metrics (#20700) After turning this on and testing locally, note the 'seriesCreated' metric "localStore": {"name":"localStore","tags":null,"values":{"pointsWritten":2987,"seriesCreated":58,"valuesWritten":23754}}, "ingress": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"cq","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":4}}, "ingress:1": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"database","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:2": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"httpd","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":46}}, "ingress:3": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"ingress","rp":"monitor"},"values":{"pointsWritten":14,"seriesCreated":14,"valuesWritten":42}}, "ingress:4": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"localStore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:5": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"queryExecutor","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":10}}, "ingress:6": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"runtime","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":30}}, "ingress:7": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"shard","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":22}}, "ingress:8": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"subscriber","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:9": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_cache","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":18}}, "ingress:10": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_engine","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":58}}, "ingress:11": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_filestore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:12": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_wal","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":8}}, "ingress:13": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"write","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":18}}, "ingress:14": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"cpu","rp":"autogen"},"values":{"pointsWritten":1342,"seriesCreated":13,"valuesWritten":13420}}, "ingress:15": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"disk","rp":"autogen"},"values":{"pointsWritten":642,"seriesCreated":6,"valuesWritten":4494}}, "ingress:16": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"diskio","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":2,"valuesWritten":2354}}, "ingress:17": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"mem","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":963}}, "ingress:18": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"processes","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":856}}, "ingress:19": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"swap","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":1,"valuesWritten":642}}, "ingress:20": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"system","rp":"autogen"},"values":{"pointsWritten":321,"seriesCreated":1,"valuesWritten":749}}, Closes: https://github.com/influxdata/influxdb/issues/20613
2021-02-05 18:52:43 +00:00
return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids, tracker)
2018-01-09 19:05:37 +00:00
})
}
if err := g.Wait(); err != nil {
return nil, err
2018-01-02 19:20:03 +00:00
}
2017-12-19 17:31:33 +00:00
return ids, nil
}
2017-10-02 14:07:11 +00:00
2017-10-26 19:55:00 +00:00
// DeleteSeriesID flags a series as permanently deleted.
2017-12-19 17:31:33 +00:00
// If the series is reintroduced later then it must create a new id.
func (f *SeriesFile) DeleteSeriesID(id uint64) error {
2018-01-09 19:05:37 +00:00
p := f.SeriesIDPartition(id)
if p == nil {
return ErrInvalidSeriesPartitionID
2017-10-26 19:55:00 +00:00
}
2018-01-09 19:05:37 +00:00
return p.DeleteSeriesID(id)
2017-10-25 13:29:44 +00:00
}
2017-10-26 19:55:00 +00:00
// IsDeleted returns true if the ID has been deleted before.
2017-12-19 17:31:33 +00:00
func (f *SeriesFile) IsDeleted(id uint64) bool {
2018-01-09 19:05:37 +00:00
p := f.SeriesIDPartition(id)
if p == nil {
2018-01-08 16:11:29 +00:00
return false
}
2018-01-09 19:05:37 +00:00
return p.IsDeleted(id)
2017-09-25 15:31:20 +00:00
}
2017-12-19 17:31:33 +00:00
// SeriesKey returns the series key for a given id.
func (f *SeriesFile) SeriesKey(id uint64) []byte {
if id == 0 {
2017-09-14 15:41:58 +00:00
return nil
}
2018-01-09 19:05:37 +00:00
p := f.SeriesIDPartition(id)
if p == nil {
2018-01-08 16:11:29 +00:00
return nil
}
2018-01-09 19:05:37 +00:00
return p.SeriesKey(id)
2017-09-17 18:06:37 +00:00
}
// SeriesKeys returns a list of series keys from a list of ids.
func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte {
keys := make([][]byte, len(ids))
for i := range ids {
keys[i] = f.SeriesKey(ids[i])
}
return keys
}
2017-09-17 18:06:37 +00:00
// Series returns the parsed series name and tags for an offset.
2017-12-19 17:31:33 +00:00
func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) {
key := f.SeriesKey(id)
2017-09-17 18:06:37 +00:00
if key == nil {
return nil, nil
}
return ParseSeriesKey(key)
2017-09-14 15:41:58 +00:00
}
2017-12-19 17:31:33 +00:00
// SeriesID return the series id for the series.
func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 {
2018-01-09 19:05:37 +00:00
key := AppendSeriesKey(buf[:0], name, tags)
keyPartition := f.SeriesKeyPartition(key)
if keyPartition == nil {
2018-01-08 16:11:29 +00:00
return 0
}
2018-01-09 19:05:37 +00:00
return keyPartition.FindIDBySeriesKey(key)
2017-12-19 17:31:33 +00:00
}
2017-09-14 15:41:58 +00:00
// HasSeries return true if the series exists.
func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
2017-12-29 18:57:30 +00:00
return f.SeriesID(name, tags, buf) > 0
2017-09-14 15:41:58 +00:00
}
// SeriesCount returns the number of series.
2017-09-26 13:40:26 +00:00
func (f *SeriesFile) SeriesCount() uint64 {
2018-01-09 19:05:37 +00:00
var n uint64
for _, p := range f.partitions {
n += p.SeriesCount()
2018-01-08 16:11:29 +00:00
}
2017-10-02 14:07:11 +00:00
return n
2017-09-14 15:41:58 +00:00
}
// SeriesIDIterator returns an iterator over all the series.
2017-11-15 23:09:25 +00:00
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
2017-12-20 22:13:34 +00:00
var ids []uint64
2018-01-09 19:05:37 +00:00
for _, p := range f.partitions {
ids = p.AppendSeriesIDs(ids)
2017-11-22 15:30:02 +00:00
}
2018-01-09 19:05:37 +00:00
sort.Sort(uint64Slice(ids))
2017-12-29 18:57:30 +00:00
return NewSeriesIDSliceIterator(ids)
2017-12-19 17:31:33 +00:00
}
2017-11-22 15:30:02 +00:00
2018-01-09 19:05:37 +00:00
func (f *SeriesFile) SeriesIDPartitionID(id uint64) int {
return int((id - 1) % SeriesFilePartitionN)
2017-12-20 22:13:34 +00:00
}
2018-01-09 19:05:37 +00:00
func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition {
partitionID := f.SeriesIDPartitionID(id)
if partitionID >= len(f.partitions) {
return nil
2017-12-20 22:13:34 +00:00
}
2018-01-09 19:05:37 +00:00
return f.partitions[partitionID]
2017-12-27 15:09:36 +00:00
}
2018-01-09 19:05:37 +00:00
func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int {
partitionIDs := make([]int, len(keys))
for i := range keys {
partitionIDs[i] = f.SeriesKeyPartitionID(keys[i])
2017-12-20 22:13:34 +00:00
}
2018-01-09 19:05:37 +00:00
return partitionIDs
2017-11-22 15:30:02 +00:00
}
2018-01-09 19:05:37 +00:00
func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int {
return int(xxhash.Sum64(key) % SeriesFilePartitionN)
2017-12-19 17:31:33 +00:00
}
2017-10-02 14:07:11 +00:00
2018-01-09 19:05:37 +00:00
func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition {
partitionID := f.SeriesKeyPartitionID(key)
if partitionID >= len(f.partitions) {
2017-12-29 18:57:30 +00:00
return nil
2017-10-02 14:07:11 +00:00
}
2018-01-09 19:05:37 +00:00
return f.partitions[partitionID]
2017-10-02 14:07:11 +00:00
}
2017-09-14 15:41:58 +00:00
// AppendSeriesKey serializes name and tags to a byte slice.
// The total length is prepended as a uvarint.
func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte {
2017-09-26 13:40:26 +00:00
buf := make([]byte, binary.MaxVarintLen64)
2017-09-14 15:41:58 +00:00
origLen := len(dst)
// The tag count is variable encoded, so we need to know ahead of time what
// the size of the tag count value will be.
2017-09-26 13:40:26 +00:00
tcBuf := make([]byte, binary.MaxVarintLen64)
2017-09-14 15:41:58 +00:00
tcSz := binary.PutUvarint(tcBuf, uint64(len(tags)))
// Size of name/tags. Does not include total length.
size := 0 + //
2 + // size of measurement
len(name) + // measurement
tcSz + // size of number of tags
(4 * len(tags)) + // length of each tag key and value
tags.Size() // size of tag keys/values
// Variable encode length.
totalSz := binary.PutUvarint(buf, uint64(size))
// If caller doesn't provide a buffer then pre-allocate an exact one.
if dst == nil {
dst = make([]byte, 0, size+totalSz)
}
// Append total length.
dst = append(dst, buf[:totalSz]...)
// Append name.
binary.BigEndian.PutUint16(buf, uint16(len(name)))
dst = append(dst, buf[:2]...)
dst = append(dst, name...)
// Append tag count.
dst = append(dst, tcBuf[:tcSz]...)
// Append tags.
for _, tag := range tags {
binary.BigEndian.PutUint16(buf, uint16(len(tag.Key)))
dst = append(dst, buf[:2]...)
dst = append(dst, tag.Key...)
binary.BigEndian.PutUint16(buf, uint16(len(tag.Value)))
dst = append(dst, buf[:2]...)
dst = append(dst, tag.Value...)
}
// Verify that the total length equals the encoded byte count.
if got, exp := len(dst)-origLen, size+totalSz; got != exp {
panic(fmt.Sprintf("series key encoding does not match calculated total length: actual=%d, exp=%d, key=%x", got, exp, dst))
}
return dst
}
// ReadSeriesKey returns the series key from the beginning of the buffer.
2017-09-18 19:03:47 +00:00
func ReadSeriesKey(data []byte) (key, remainder []byte) {
2017-09-14 15:41:58 +00:00
sz, n := binary.Uvarint(data)
2017-09-18 19:03:47 +00:00
return data[:int(sz)+n], data[int(sz)+n:]
2017-09-14 15:41:58 +00:00
}
func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) {
2017-09-17 18:06:37 +00:00
sz64, i := binary.Uvarint(data)
return int(sz64), data[i:]
2017-09-14 15:41:58 +00:00
}
func ReadSeriesKeyMeasurement(data []byte) (name, remainder []byte) {
2017-09-17 18:06:37 +00:00
n, data := binary.BigEndian.Uint16(data), data[2:]
2017-09-14 15:41:58 +00:00
return data[:n], data[n:]
}
func ReadSeriesKeyTagN(data []byte) (n int, remainder []byte) {
2017-09-17 18:06:37 +00:00
n64, i := binary.Uvarint(data)
return int(n64), data[i:]
2017-09-14 15:41:58 +00:00
}
func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) {
n, data := binary.BigEndian.Uint16(data), data[2:]
key, data = data[:n], data[n:]
n, data = binary.BigEndian.Uint16(data), data[2:]
value, data = data[:n], data[n:]
return key, value, data
}
2017-09-17 18:06:37 +00:00
// ParseSeriesKey extracts the name & tags from a series key.
func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) {
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
return parseSeriesKey(data, nil)
}
// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into
// dstTags, which is then returened.
//
// The returned dstTags may have a different length and capacity.
func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {
return parseSeriesKey(data, dstTags)
}
// parseSeriesKey extracts the name and tags from data, attempting to re-use the
// provided tags value rather than allocating. The returned tags may have a
// different length and capacity to those provided.
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
2017-09-14 15:41:58 +00:00
_, data = ReadSeriesKeyLen(data)
2017-09-17 18:06:37 +00:00
name, data = ReadSeriesKeyMeasurement(data)
2017-09-14 15:41:58 +00:00
tagN, data := ReadSeriesKeyTagN(data)
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
dst = dst[:cap(dst)] // Grow dst to use full capacity
if got, want := len(dst), tagN; got < want {
dst = append(dst, make(models.Tags, want-got)...)
} else if got > want {
dst = dst[:want]
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
}
dst = dst[:tagN]
2017-09-14 15:41:58 +00:00
for i := 0; i < tagN; i++ {
var key, value []byte
key, value, data = ReadSeriesKeyTag(data)
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
dst[i].Key, dst[i].Value = key, value
2017-09-14 15:41:58 +00:00
}
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
return name, dst
2017-09-14 15:41:58 +00:00
}
func CompareSeriesKeys(a, b []byte) int {
// Handle 'nil' keys.
if len(a) == 0 && len(b) == 0 {
return 0
} else if len(a) == 0 {
return -1
} else if len(b) == 0 {
return 1
}
// Read total size.
2017-09-17 18:06:37 +00:00
_, a = ReadSeriesKeyLen(a)
_, b = ReadSeriesKeyLen(b)
2017-09-14 15:41:58 +00:00
// Read names.
name0, a := ReadSeriesKeyMeasurement(a)
name1, b := ReadSeriesKeyMeasurement(b)
// Compare names, return if not equal.
if cmp := bytes.Compare(name0, name1); cmp != 0 {
return cmp
}
// Read tag counts.
tagN0, a := ReadSeriesKeyTagN(a)
tagN1, b := ReadSeriesKeyTagN(b)
// Compare each tag in order.
2017-09-17 18:06:37 +00:00
for i := 0; ; i++ {
2017-09-14 15:41:58 +00:00
// Check for EOF.
if i == tagN0 && i == tagN1 {
return 0
} else if i == tagN0 {
return -1
} else if i == tagN1 {
return 1
}
// Read keys.
var key0, key1, value0, value1 []byte
key0, value0, a = ReadSeriesKeyTag(a)
key1, value1, b = ReadSeriesKeyTag(b)
// Compare keys & values.
if cmp := bytes.Compare(key0, key1); cmp != 0 {
return cmp
} else if cmp := bytes.Compare(value0, value1); cmp != 0 {
return cmp
}
}
}
2018-01-09 19:05:37 +00:00
// GenerateSeriesKeys generates series keys for a list of names & tags using
// a single large memory block.
func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte {
buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice))
keys := make([][]byte, len(names))
for i := range names {
offset := len(buf)
buf = AppendSeriesKey(buf, names[i], tagsSlice[i])
keys[i] = buf[offset:]
2017-10-02 14:07:11 +00:00
}
2018-01-09 19:05:37 +00:00
return keys
2017-12-21 21:50:07 +00:00
}
2018-01-09 19:05:37 +00:00
// SeriesKeysSize returns the number of bytes required to encode a list of name/tags.
func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int {
var n int
for i := range names {
n += SeriesKeySize(names[i], tagsSlice[i])
2017-12-27 15:09:36 +00:00
}
2018-01-09 19:05:37 +00:00
return n
2017-12-27 15:09:36 +00:00
}
2017-10-02 14:07:11 +00:00
2018-01-09 19:05:37 +00:00
// SeriesKeySize returns the number of bytes required to encode a series key.
func SeriesKeySize(name []byte, tags models.Tags) int {
var n int
n += 2 + len(name)
n += binaryutil.UvarintSize(uint64(len(tags)))
for _, tag := range tags {
n += 2 + len(tag.Key)
n += 2 + len(tag.Value)
2017-12-27 15:09:36 +00:00
}
2018-01-09 19:05:37 +00:00
n += binaryutil.UvarintSize(uint64(n))
return n
2017-12-27 15:09:36 +00:00
}
2018-01-09 19:05:37 +00:00
type seriesKeys [][]byte
2017-10-02 14:07:11 +00:00
2018-01-09 19:05:37 +00:00
func (a seriesKeys) Len() int { return len(a) }
func (a seriesKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesKeys) Less(i, j int) bool {
return CompareSeriesKeys(a[i], a[j]) == -1
2017-10-02 14:07:11 +00:00
}
2018-01-09 19:05:37 +00:00
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
func nop() {}