fix(tsdb): minimize lock contention when adding new fields or measurements (#20504)

fields.idx frequent writes cause lock contention and fields.idx is recreated
when a field or measurement is added in a WritePointsWithContext()
This eliminates locking during the actual file rewrite, and limits it to
the times when the MeasurementFieldSet is actually being read or written 
in memory and when the new file is being renamed.

Test verification of correct behavior by checking the fields.idx
file matches the in-memory copy after heavily parallel measurement addition.

Fixes https://github.com/influxdata/influxdb/issues/20500
pull/20636/head
davidby-influx 2021-01-15 08:31:45 -08:00 committed by GitHub
parent 415361e1eb
commit fe3af66c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 146 additions and 41 deletions

View File

@ -12,6 +12,7 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
@ -760,11 +761,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
s.index.SetFieldName(f.Measurement, f.Field.Name)
}
if len(fieldsToCreate) > 0 {
return engine.MeasurementFieldSet().Save()
}
return nil
return engine.MeasurementFieldSet().Save()
}
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
@ -1647,16 +1644,20 @@ func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataT
type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
// path is the location to persist field sets
path string
// ephemeral counters for updating the file on disk
memoryVersion uint64
writtenVersion uint64
}
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
path: path,
fields: make(map[string]*MeasurementFields),
path: path,
memoryVersion: 0,
writtenVersion: 0,
}
// If there is a load error, return the error and an empty set so
@ -1741,21 +1742,41 @@ func (fs *MeasurementFieldSet) IsEmpty() bool {
return len(fs.fields) == 0
}
func (fs *MeasurementFieldSet) Save() error {
fs.mu.Lock()
defer fs.mu.Unlock()
func (fs *MeasurementFieldSet) Save() (err error) {
// current version
var v uint64
// Is the MeasurementFieldSet empty?
isEmpty := false
// marshaled MeasurementFieldSet
return fs.saveNoLock()
}
b, err := func() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.memoryVersion += 1
v = fs.memoryVersion
// If no fields left, remove the fields index file
if len(fs.fields) == 0 {
isEmpty = true
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
} else {
fs.writtenVersion = fs.memoryVersion
return nil, nil
}
}
return fs.marshalMeasurementFieldSetNoLock()
}()
func (fs *MeasurementFieldSet) saveNoLock() error {
// No fields left, remove the fields index file
if len(fs.fields) == 0 {
return os.RemoveAll(fs.path)
if err != nil {
return err
} else if isEmpty {
return nil
}
// Write the new index to a temp file and rename when it's sync'd
path := fs.path + ".tmp"
// if it is still the most recent memoryVersion of the MeasurementFields
path := fs.path + "." + strconv.FormatUint(v, 10) + ".tmp"
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
@ -1766,28 +1787,6 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
return err
}
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
for name, mf := range fs.fields {
fs := &internal.MeasurementFields{
Name: []byte(name),
Fields: make([]*internal.Field, 0, mf.FieldN()),
}
mf.ForEachField(func(field string, typ influxql.DataType) bool {
fs.Fields = append(fs.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)})
return true
})
pb.Measurements = append(pb.Measurements, fs)
}
b, err := proto.Marshal(&pb)
if err != nil {
return err
}
if _, err := fd.Write(b); err != nil {
return err
}
@ -1801,11 +1800,52 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
return err
}
fs.mu.Lock()
defer fs.mu.Unlock()
// Check if a later modification and save of fields has superseded ours
// If so, we are successfully done! We were beaten by a later call
// to this function
if fs.writtenVersion > v {
return nil
}
if err := file.RenameFile(path, fs.path); err != nil {
return err
}
return file.SyncDir(filepath.Dir(fs.path))
if err = file.SyncDir(filepath.Dir(fs.path)); err != nil {
return err
}
// Update the written version to the current version
fs.writtenVersion = v
return nil
}
func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []byte, err error) {
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
for name, mf := range fs.fields {
imf := &internal.MeasurementFields{
Name: []byte(name),
Fields: make([]*internal.Field, 0, mf.FieldN()),
}
mf.ForEachField(func(field string, typ influxql.DataType) bool {
imf.Fields = append(imf.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)})
return true
})
pb.Measurements = append(pb.Measurements, imf)
}
b, err := proto.Marshal(&pb)
if err != nil {
return nil, err
} else {
return b, nil
}
}
func (fs *MeasurementFieldSet) load() error {

View File

@ -1696,6 +1696,71 @@ func TestMeasurementFieldSet_InvalidFormat(t *testing.T) {
}
}
func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
var iterations int
dir, cleanup := MustTempDir()
defer cleanup()
if testing.Short() {
iterations = 50
} else {
iterations = 200
}
mt := []string{"cpu", "dpu", "epu", "fpu"}
ft := make([][]string, len(mt), len(mt))
for mi, m := range mt {
ft[mi] = make([]string, iterations, iterations)
for i := 0; i < iterations; i += 1 {
ft[mi][i] = fmt.Sprintf("%s_%s_%d", m, "value", i)
}
}
path := filepath.Join(dir, "fields.idx")
mfs, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
var wg sync.WaitGroup
wg.Add(len(ft))
for i, fs := range ft {
go testFieldMaker(t, &wg, mfs, mt[i], fs)
}
wg.Wait()
mfs2, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
for i, fs := range ft {
mf := mfs.Fields([]byte(mt[i]))
mf2 := mfs2.Fields([]byte(mt[i]))
for _, f := range fs {
if mf2.Field(f) == nil {
t.Fatalf("Created field not found on reloaded MeasurementFieldSet %s", f)
}
if mf.Field(f) == nil {
t.Fatalf("Created field not found in original MeasureMentFieldSet: %s", f)
}
}
}
}
func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldSet, measurement string, fieldNames []string) {
defer wg.Done()
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
for _, fieldName := range fieldNames {
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
}
}
}
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }