fix: ensure fields in memory match on disk

* chore: refactor field creation for maintainability

Address review comments in the port work of the
field creation. Also fixes one bug in returning the wrong
error.

(cherry picked from commit 5f576331d3)

closes https://github.com/influxdata/influxdb/issues/26035

* fix: ensure fields in memory match on disk

A field could be created in  memory but not
saved to disk if a later field in that
point was invalid (type conflict, too big)
Ensure that if a field is created, it is
saved.

(cherry picked from commit 083b679b56)
DSB_compaction_clean_2
davidby-influx 2025-02-24 15:39:32 -08:00 committed by GitHub
parent 26170d4e57
commit 248022800a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 85 additions and 107 deletions

View File

@ -1264,7 +1264,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := models.ParseName(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
if _, _, err := mf.CreateFieldIfNotExists(string(field), fieldTypes[i]); err != nil {
return err
}

View File

@ -79,7 +79,7 @@ func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"}))
@ -765,7 +765,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -821,7 +821,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -876,7 +876,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -933,7 +933,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -990,8 +990,8 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("F", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -1049,9 +1049,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("X", influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("Y", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -1895,7 +1895,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -1954,7 +1954,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -2013,7 +2013,7 @@ func TestEngine_CreateIterator_SeriesKey(t *testing.T) {
assert := tassert.New(t)
e := MustOpenEngine(t, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"}))
@ -2258,7 +2258,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(b, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
@ -2283,7 +2283,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(b, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
cpus := runtime.GOMAXPROCS(0)
pp := make([]models.Point, 0, sz*cpus)
@ -2515,7 +2515,7 @@ func MustInitDefaultBenchmarkEngine(tb testing.TB, name string, pointN int) *ben
e := MustOpenEngine(tb, tsdb.DefaultIndex)
// Initialize metadata.
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
// Generate time ascending points with jitterred time & value.

View File

@ -2,6 +2,7 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"github.com/influxdata/influxdb/v2/models"
@ -10,14 +11,17 @@ import (
const MaxFieldValueLength = 1048576
// ValidateFields will return a PartialWriteError if:
// ValidateAndCreateFields will return a PartialWriteError if:
// - the point has inconsistent fields, or
// - the point has fields that are too long
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) ([]*FieldCreate, error) {
func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) ([]*FieldCreate, *PartialWriteError) {
pointSize := point.StringSize()
iter := point.FieldIterator()
var fieldsToCreate []*FieldCreate
// We return fieldsToCreate even on error, because other writes
// in parallel may depend on these previous fields having been
// created in memory
for iter.Next() {
if !skipSizeValidation {
// Check for size of field too large. Note it is much cheaper to check the whole point size
@ -25,9 +29,9 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
// unescape the string, and must at least parse the string)
if pointSize > MaxFieldValueLength && iter.Type() == models.String {
if sz := len(iter.StringValue()); sz > MaxFieldValueLength {
return nil, PartialWriteError{
return fieldsToCreate, &PartialWriteError{
Reason: fmt.Sprintf(
"input field \"%s\" on measurement \"%s\" is too long, %d > %d",
"input field %q on measurement %q is too long, %d > %d",
iter.FieldKey(), point.Name(), sz, MaxFieldValueLength),
Dropped: 1,
}
@ -46,23 +50,24 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
continue
}
// If the field is not present, remember to create it.
f := mf.FieldBytes(fieldKey)
if f == nil {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: point.Name(),
Field: &Field{
Name: string(fieldKey),
Type: dataType,
}})
} else if f.Type != dataType {
// If the types are not the same, there is a conflict.
return nil, PartialWriteError{
fieldName := string(fieldKey)
f, created, err := mf.CreateFieldIfNotExists(fieldName, dataType)
if errors.Is(err, ErrFieldTypeConflict) {
return fieldsToCreate, &PartialWriteError{
Reason: fmt.Sprintf(
"%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s",
ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type),
"%s: input field %q on measurement %q is type %s, already exists as type %s",
err, fieldName, point.Name(), dataType, f.Type),
Dropped: 1,
}
} else if err != nil {
return fieldsToCreate, &PartialWriteError{
Reason: fmt.Sprintf(
"error adding field %q to measurement %q: %s",
fieldName, point.Name(), err),
Dropped: 1,
}
} else if created {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{point.Name(), f})
}
}
return fieldsToCreate, nil

View File

@ -706,7 +706,7 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr er
}
// add any new fields and keep track of what needs to be saved
if numFieldsCreated, err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
if numFieldsCreated, err := s.saveFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
} else {
s.stats.fieldsCreated.Add(float64(numFieldsCreated))
@ -723,10 +723,10 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr er
// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed.
func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, []*FieldCreate, error) {
var (
fieldsToCreate []*FieldCreate
err error
dropped int
reason string // only first error reason is set unless returned from CreateSeriesListIfNotExists
createdFieldsToSave []*FieldCreate
err error
dropped int
reason string // only first error reason is set unless returned from CreateSeriesListIfNotExists
)
// Create all series against the index in bulk.
@ -821,42 +821,28 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
continue
}
err := func(p models.Point, iter models.FieldIterator) error {
var newFields []*FieldCreate
var validateErr error
name := p.Name()
mf := engine.MeasurementFields(name)
// Check with the field validator.
if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
var err PartialWriteError
switch {
case errors.As(validateErr, &err):
// This will turn into an error later, outside this lambda
if reason == "" {
reason = err.Reason
}
dropped += err.Dropped
s.stats.writesDropped.Add(float64(err.Dropped))
default:
return err
}
return nil
}
name := p.Name()
mf := engine.MeasurementFields(name)
// Check with the field validator.
newFields, partialWriteError := ValidateAndCreateFields(mf, p, s.options.Config.SkipFieldSizeValidation)
createdFieldsToSave = append(createdFieldsToSave, newFields...)
points[j] = points[i]
j++
fieldsToCreate = append(fieldsToCreate, newFields...)
return nil
}(p, iter)
if err != nil {
return nil, nil, err
if partialWriteError != nil {
if reason == "" {
reason = partialWriteError.Reason
}
dropped += partialWriteError.Dropped
s.stats.writesDropped.Add(float64(partialWriteError.Dropped))
continue
}
points[j] = points[i]
j++
}
if dropped > 0 {
err = PartialWriteError{Reason: reason, Dropped: dropped}
}
return points[:j], fieldsToCreate, err
return points[:j], createdFieldsToSave, err
}
const unPrintReplRune = '?'
@ -881,8 +867,8 @@ func makePrintable(s string) string {
return b.String()
}
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, error) {
if len(fieldsToCreate) == 0 {
func (s *Shard) saveFieldsAndMeasurements(fieldsToSave []*FieldCreate) (int, error) {
if len(fieldsToSave) == 0 {
return 0, nil
}
@ -892,18 +878,13 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int,
}
numCreated := 0
// add fields
changes := make([]*FieldChange, 0, len(fieldsToCreate))
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return 0, err
} else if created {
numCreated++
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
changes := make([]*FieldChange, 0, len(fieldsToSave))
for _, f := range fieldsToSave {
numCreated++
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
return numCreated, engine.MeasurementFieldSet().Save(changes)
@ -1854,18 +1835,18 @@ func (m *MeasurementFields) bytes() int {
// CreateFieldIfNotExists creates a new field with the given name and type.
// Returns an error if the field already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) {
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType) (f *Field, created bool, err error) {
newField := &Field{
Name: string(name),
Name: name,
Type: typ,
}
if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded {
if f.Type != typ {
return false, ErrFieldTypeConflict
}
return false, nil
var loaded bool
if f, loaded = m.fields.LoadOrStore(newField.Name, newField); f.Type != typ {
// This implies the field existed as a different type already.
return f, false, ErrFieldTypeConflict
} else {
return f, !loaded, nil
}
return true, nil
}
func (m *MeasurementFields) FieldN() int {
@ -1886,14 +1867,6 @@ func (m *MeasurementFields) HasField(name string) bool {
return ok
}
// FieldBytes returns the field for name, or nil if there is no field for name.
// FieldBytes should be preferred to Field when the caller has a []byte, because
// it avoids a string allocation, which can't be avoided if the caller converts
// the []byte to a string and calls Field.
func (m *MeasurementFields) FieldBytes(name []byte) *Field {
return m.Field(string(name))
}
// FieldSet returns the set of fields and their types for the measurement.
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
fieldTypes := make(map[string]influxql.DataType)
@ -2531,7 +2504,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
fs.Delete(string(fc.Measurement))
} else {
mf := fs.CreateFieldsIfNotExists(fc.Measurement)
if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
if _, _, err := mf.CreateFieldIfNotExists(fc.Field.Name, fc.Field.Type); err != nil {
err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err)
log.Error("field creation", zap.Error(err))
return err
@ -2544,8 +2517,8 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
// Field represents a series field. All of the fields must be hashable.
type Field struct {
Name string `json:"name,omitempty"`
Type influxql.DataType `json:"type,omitempty"`
Name string
Type influxql.DataType
}
type FieldChange struct {

View File

@ -1604,7 +1604,7 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
}
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
@ -1655,7 +1655,7 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) {
measurement := []byte("cpu")
fields := mf.CreateFieldsIfNotExists(measurement)
fieldName := "value"
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
@ -1725,7 +1725,7 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) {
defer checkMeasurementFieldSetClose(t, mf)
for _, f := range testFields {
fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement))
if _, err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil {
if _, _, err := fields.CreateFieldIfNotExists(f.Field, f.FieldType); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
@ -1786,7 +1786,7 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
@ -1935,7 +1935,7 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
for _, fieldName := range fieldNames {
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Logf("create field error: %v", err)
t.Fail()
return