fix(tsdb): minimize lock contention when adding new fields or measurements (#21228)
fields.idx frequent writes cause lock contention and fields.idx is recreated when a field or measurement is added in a WritePointsWithContext() This eliminates locking during the actual file rewrite, and limits it to the times when the MeasurementFieldSet is actually being read or written in memory and when the new file is being renamed. Test verification of correct behavior by checking the fields.idx file matches the in-memory copy after heavily parallel measurement addition. Co-authored-by: davidby-influx <72418212+davidby-influx@users.noreply.github.com>pull/21235/head
parent
de1d8dc2d6
commit
7b1763e791
|
@ -73,6 +73,7 @@ The prefix used for Prometheus metrics from the query controller has changed fro
|
||||||
1. [20925](https://github.com/influxdata/influxdb/pull/20925): Fix parse error in UI for tag filters containing regex meta characters.
|
1. [20925](https://github.com/influxdata/influxdb/pull/20925): Fix parse error in UI for tag filters containing regex meta characters.
|
||||||
1. [21042](https://github.com/influxdata/influxdb/pull/21042): Prevent concurrent access panic when gathering bolt metrics.
|
1. [21042](https://github.com/influxdata/influxdb/pull/21042): Prevent concurrent access panic when gathering bolt metrics.
|
||||||
1. [21127](https://github.com/influxdata/influxdb/pull/21127): Fix race condition in Flux controller shutdown.
|
1. [21127](https://github.com/influxdata/influxdb/pull/21127): Fix race condition in Flux controller shutdown.
|
||||||
|
1. [21228](https://github.com/influxdata/influxdb/pull/21228): Reduce lock contention when adding new fields and measurements.
|
||||||
|
|
||||||
## v2.0.4 [2021-02-08]
|
## v2.0.4 [2021-02-08]
|
||||||
|
|
||||||
|
|
114
tsdb/shard.go
114
tsdb/shard.go
|
@ -12,6 +12,7 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -716,11 +717,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(fieldsToCreate) > 0 {
|
|
||||||
return engine.MeasurementFieldSet().Save()
|
return engine.MeasurementFieldSet().Save()
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
|
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
|
||||||
|
@ -1643,9 +1640,11 @@ func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataT
|
||||||
type MeasurementFieldSet struct {
|
type MeasurementFieldSet struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
fields map[string]*MeasurementFields
|
fields map[string]*MeasurementFields
|
||||||
|
|
||||||
// path is the location to persist field sets
|
// path is the location to persist field sets
|
||||||
path string
|
path string
|
||||||
|
// ephemeral counters for updating the file on disk
|
||||||
|
memoryVersion uint64
|
||||||
|
writtenVersion uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
|
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
|
||||||
|
@ -1653,6 +1652,8 @@ func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
|
||||||
fs := &MeasurementFieldSet{
|
fs := &MeasurementFieldSet{
|
||||||
fields: make(map[string]*MeasurementFields),
|
fields: make(map[string]*MeasurementFields),
|
||||||
path: path,
|
path: path,
|
||||||
|
memoryVersion: 0,
|
||||||
|
writtenVersion: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is a load error, return the error and an empty set so
|
// If there is a load error, return the error and an empty set so
|
||||||
|
@ -1737,21 +1738,41 @@ func (fs *MeasurementFieldSet) IsEmpty() bool {
|
||||||
return len(fs.fields) == 0
|
return len(fs.fields) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *MeasurementFieldSet) Save() error {
|
func (fs *MeasurementFieldSet) Save() (err error) {
|
||||||
|
// current version
|
||||||
|
var v uint64
|
||||||
|
// Is the MeasurementFieldSet empty?
|
||||||
|
isEmpty := false
|
||||||
|
// marshaled MeasurementFieldSet
|
||||||
|
|
||||||
|
b, err := func() ([]byte, error) {
|
||||||
fs.mu.Lock()
|
fs.mu.Lock()
|
||||||
defer fs.mu.Unlock()
|
defer fs.mu.Unlock()
|
||||||
|
fs.memoryVersion += 1
|
||||||
return fs.saveNoLock()
|
v = fs.memoryVersion
|
||||||
}
|
// If no fields left, remove the fields index file
|
||||||
|
|
||||||
func (fs *MeasurementFieldSet) saveNoLock() error {
|
|
||||||
// No fields left, remove the fields index file
|
|
||||||
if len(fs.fields) == 0 {
|
if len(fs.fields) == 0 {
|
||||||
return os.RemoveAll(fs.path)
|
isEmpty = true
|
||||||
|
if err := os.RemoveAll(fs.path); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
fs.writtenVersion = fs.memoryVersion
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fs.marshalMeasurementFieldSetNoLock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if isEmpty {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the new index to a temp file and rename when it's sync'd
|
// Write the new index to a temp file and rename when it's sync'd
|
||||||
path := fs.path + ".tmp"
|
// if it is still the most recent memoryVersion of the MeasurementFields
|
||||||
|
path := fs.path + "." + strconv.FormatUint(v, 10) + ".tmp"
|
||||||
|
|
||||||
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
|
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1762,28 +1783,6 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
pb := internal.MeasurementFieldSet{
|
|
||||||
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
|
|
||||||
}
|
|
||||||
for name, mf := range fs.fields {
|
|
||||||
fs := &internal.MeasurementFields{
|
|
||||||
Name: []byte(name),
|
|
||||||
Fields: make([]*internal.Field, 0, mf.FieldN()),
|
|
||||||
}
|
|
||||||
|
|
||||||
mf.ForEachField(func(field string, typ influxql.DataType) bool {
|
|
||||||
fs.Fields = append(fs.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)})
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
pb.Measurements = append(pb.Measurements, fs)
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := proto.Marshal(&pb)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := fd.Write(b); err != nil {
|
if _, err := fd.Write(b); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1797,11 +1796,52 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs.mu.Lock()
|
||||||
|
defer fs.mu.Unlock()
|
||||||
|
|
||||||
|
// Check if a later modification and save of fields has superseded ours
|
||||||
|
// If so, we are successfully done! We were beaten by a later call
|
||||||
|
// to this function
|
||||||
|
if fs.writtenVersion > v {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err := file.RenameFile(path, fs.path); err != nil {
|
if err := file.RenameFile(path, fs.path); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return file.SyncDir(filepath.Dir(fs.path))
|
if err = file.SyncDir(filepath.Dir(fs.path)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Update the written version to the current version
|
||||||
|
fs.writtenVersion = v
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []byte, err error) {
|
||||||
|
pb := internal.MeasurementFieldSet{
|
||||||
|
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, mf := range fs.fields {
|
||||||
|
imf := &internal.MeasurementFields{
|
||||||
|
Name: []byte(name),
|
||||||
|
Fields: make([]*internal.Field, 0, mf.FieldN()),
|
||||||
|
}
|
||||||
|
|
||||||
|
mf.ForEachField(func(field string, typ influxql.DataType) bool {
|
||||||
|
imf.Fields = append(imf.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)})
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
pb.Measurements = append(pb.Measurements, imf)
|
||||||
|
}
|
||||||
|
b, err := proto.Marshal(&pb)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *MeasurementFieldSet) load() error {
|
func (fs *MeasurementFieldSet) load() error {
|
||||||
|
|
|
@ -1653,6 +1653,73 @@ func TestMeasurementFieldSet_InvalidFormat(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
|
||||||
|
var iterations int
|
||||||
|
dir, cleanup := MustTempDir()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
iterations = 50
|
||||||
|
} else {
|
||||||
|
iterations = 200
|
||||||
|
}
|
||||||
|
|
||||||
|
mt := []string{"cpu", "dpu", "epu", "fpu"}
|
||||||
|
ft := make([][]string, len(mt))
|
||||||
|
for mi, m := range mt {
|
||||||
|
ft[mi] = make([]string, iterations)
|
||||||
|
for i := 0; i < iterations; i += 1 {
|
||||||
|
ft[mi][i] = fmt.Sprintf("%s_%s_%d", m, "value", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
path := filepath.Join(dir, "fields.idx")
|
||||||
|
mfs, err := tsdb.NewMeasurementFieldSet(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewMeasurementFieldSet error: %v", err)
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Add(len(ft))
|
||||||
|
for i, fs := range ft {
|
||||||
|
go testFieldMaker(t, &wg, mfs, mt[i], fs)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
mfs2, err := tsdb.NewMeasurementFieldSet(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewMeasurementFieldSet error: %v", err)
|
||||||
|
}
|
||||||
|
for i, fs := range ft {
|
||||||
|
mf := mfs.Fields([]byte(mt[i]))
|
||||||
|
mf2 := mfs2.Fields([]byte(mt[i]))
|
||||||
|
for _, f := range fs {
|
||||||
|
if mf2.Field(f) == nil {
|
||||||
|
t.Fatalf("Created field not found on reloaded MeasurementFieldSet %s", f)
|
||||||
|
}
|
||||||
|
if mf.Field(f) == nil {
|
||||||
|
t.Fatalf("Created field not found in original MeasureMentFieldSet: %s", f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldSet, measurement string, fieldNames []string) {
|
||||||
|
defer wg.Done()
|
||||||
|
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
|
||||||
|
for _, fieldName := range fieldNames {
|
||||||
|
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
|
||||||
|
t.Errorf("create field error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := mf.Save(); err != nil {
|
||||||
|
t.Errorf("save error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
|
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
|
||||||
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
|
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
|
||||||
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }
|
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }
|
||||||
|
|
Loading…
Reference in New Issue