From b72848d43626c353717260f5026fbe7671b7580d Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Thu, 15 Sep 2022 12:15:14 -0700 Subject: [PATCH] feat: optimize saving changes to fields.idx (#23701) (#23728) Instead of writing out the complete fields.idx file when it changes, write out incremental changes that will be applied to the file on close and startup. closes https://github.com/influxdata/influxdb/issues/23653 (cherry picked from commit 80c10c8c04d44afb709f7db84b5580fbb475fe3f) closes https://github.com/influxdata/influxdb/issues/23703 --- tsdb/engine/tsm1/engine.go | 36 ++- tsdb/internal/fieldsindex.pb.go | 243 +++++++++++++-- tsdb/internal/fieldsindex.proto | 15 + tsdb/shard.go | 535 +++++++++++++++++++++++++------- tsdb/shard_test.go | 245 ++++++++++++--- 5 files changed, 880 insertions(+), 194 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 61332ea30c..c6f51c08c2 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -719,7 +719,7 @@ func (e *Engine) Open(ctx context.Context) error { return err } - fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx")) + fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"), e.logger) if err != nil { e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err)) } @@ -763,15 +763,18 @@ func (e *Engine) Close() error { e.mu.Lock() defer e.mu.Unlock() e.done = nil // Ensures that the channel will not be closed again. - e.fieldset.Close() - if err := e.FileStore.Close(); err != nil { - return err + var err error = nil + err = e.fieldset.Close() + if err2 := e.FileStore.Close(); err2 != nil && err == nil { + err = err2 } if e.WALEnabled { - return e.WAL.Close() + if err2 := e.WAL.Close(); err2 != nil && err == nil { + err = err2 + } } - return nil + return err } // WithLogger sets the logger for the engine. @@ -868,7 +871,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { } // Save the field set index so we don't have to rebuild it next time - if err := e.fieldset.Save(); err != nil { + if err := e.fieldset.WriteToFile(); err != nil { return err } @@ -1170,7 +1173,7 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return err } } - return nil + return e.MeasurementFieldSet().WriteToFile() } // readFileFromBackup copies the next file from the archive into the shard. @@ -1703,19 +1706,20 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min ids.Add(sid) } - fielsetChanged := false + actuallyDeleted := make([]string, 0, len(measurements)) for k := range measurements { if dropped, err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil { return err } else if dropped { - if err := e.cleanupMeasurement([]byte(k)); err != nil { + if deleted, err := e.cleanupMeasurement([]byte(k)); err != nil { return err + } else if deleted { + actuallyDeleted = append(actuallyDeleted, k) } - fielsetChanged = true } } - if fielsetChanged { - if err := e.fieldset.Save(); err != nil { + if len(actuallyDeleted) > 0 { + if err := e.fieldset.Save(tsdb.MeasurementsToFieldChangeDeletions(actuallyDeleted)); err != nil { return err } } @@ -1745,7 +1749,7 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min return nil } -func (e *Engine) cleanupMeasurement(name []byte) error { +func (e *Engine) cleanupMeasurement(name []byte) (deleted bool, err error) { // A sentinel error message to cause DeleteWithLock to not delete the measurement abortErr := fmt.Errorf("measurements still exist") @@ -1776,10 +1780,10 @@ func (e *Engine) cleanupMeasurement(name []byte) error { }); err != nil && err != abortErr { // Something else failed, return it - return err + return false, err } - return nil + return err != abortErr, nil } // DeleteMeasurement deletes a measurement and all related series. diff --git a/tsdb/internal/fieldsindex.pb.go b/tsdb/internal/fieldsindex.pb.go index fda9712b22..66e5295a7d 100644 --- a/tsdb/internal/fieldsindex.pb.go +++ b/tsdb/internal/fieldsindex.pb.go @@ -20,6 +20,52 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ChangeType int32 + +const ( + ChangeType_AddMeasurementField ChangeType = 0 + ChangeType_DeleteMeasurement ChangeType = 1 +) + +// Enum value maps for ChangeType. +var ( + ChangeType_name = map[int32]string{ + 0: "AddMeasurementField", + 1: "DeleteMeasurement", + } + ChangeType_value = map[string]int32{ + "AddMeasurementField": 0, + "DeleteMeasurement": 1, + } +) + +func (x ChangeType) Enum() *ChangeType { + p := new(ChangeType) + *p = x + return p +} + +func (x ChangeType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ChangeType) Descriptor() protoreflect.EnumDescriptor { + return file_internal_fieldsindex_proto_enumTypes[0].Descriptor() +} + +func (ChangeType) Type() protoreflect.EnumType { + return &file_internal_fieldsindex_proto_enumTypes[0] +} + +func (x ChangeType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ChangeType.Descriptor instead. +func (ChangeType) EnumDescriptor() ([]byte, []int) { + return file_internal_fieldsindex_proto_rawDescGZIP(), []int{0} +} + type Series struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -287,6 +333,116 @@ func (x *MeasurementFieldSet) GetMeasurements() []*MeasurementFields { return nil } +type MeasurementFieldChange struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Measurement []byte `protobuf:"bytes,1,opt,name=Measurement,proto3" json:"Measurement,omitempty"` + Field *Field `protobuf:"bytes,2,opt,name=Field,proto3" json:"Field,omitempty"` + Change ChangeType `protobuf:"varint,3,opt,name=Change,proto3,enum=tsdb.ChangeType" json:"Change,omitempty"` +} + +func (x *MeasurementFieldChange) Reset() { + *x = MeasurementFieldChange{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_fieldsindex_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MeasurementFieldChange) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MeasurementFieldChange) ProtoMessage() {} + +func (x *MeasurementFieldChange) ProtoReflect() protoreflect.Message { + mi := &file_internal_fieldsindex_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MeasurementFieldChange.ProtoReflect.Descriptor instead. +func (*MeasurementFieldChange) Descriptor() ([]byte, []int) { + return file_internal_fieldsindex_proto_rawDescGZIP(), []int{5} +} + +func (x *MeasurementFieldChange) GetMeasurement() []byte { + if x != nil { + return x.Measurement + } + return nil +} + +func (x *MeasurementFieldChange) GetField() *Field { + if x != nil { + return x.Field + } + return nil +} + +func (x *MeasurementFieldChange) GetChange() ChangeType { + if x != nil { + return x.Change + } + return ChangeType_AddMeasurementField +} + +type FieldChangeSet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Changes []*MeasurementFieldChange `protobuf:"bytes,1,rep,name=Changes,proto3" json:"Changes,omitempty"` +} + +func (x *FieldChangeSet) Reset() { + *x = FieldChangeSet{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_fieldsindex_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FieldChangeSet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FieldChangeSet) ProtoMessage() {} + +func (x *FieldChangeSet) ProtoReflect() protoreflect.Message { + mi := &file_internal_fieldsindex_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FieldChangeSet.ProtoReflect.Descriptor instead. +func (*FieldChangeSet) Descriptor() ([]byte, []int) { + return file_internal_fieldsindex_proto_rawDescGZIP(), []int{6} +} + +func (x *FieldChangeSet) GetChanges() []*MeasurementFieldChange { + if x != nil { + return x.Changes + } + return nil +} + var File_internal_fieldsindex_proto protoreflect.FileDescriptor var file_internal_fieldsindex_proto_rawDesc = []byte{ @@ -311,9 +467,26 @@ var file_internal_fieldsindex_proto_rawDesc = []byte{ 0x65, 0x74, 0x12, 0x3b, 0x0a, 0x0c, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x74, 0x73, 0x64, 0x62, 0x2e, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x46, 0x69, 0x65, 0x6c, 0x64, - 0x73, 0x52, 0x0c, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x42, - 0x08, 0x5a, 0x06, 0x2e, 0x3b, 0x74, 0x73, 0x64, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x73, 0x52, 0x0c, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x22, + 0x87, 0x01, 0x0a, 0x16, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x46, + 0x69, 0x65, 0x6c, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x4d, 0x65, + 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x0b, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x05, + 0x46, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x74, 0x73, + 0x64, 0x62, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x05, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, + 0x28, 0x0a, 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x10, 0x2e, 0x74, 0x73, 0x64, 0x62, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x54, 0x79, 0x70, + 0x65, 0x52, 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x22, 0x48, 0x0a, 0x0e, 0x46, 0x69, 0x65, + 0x6c, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, 0x36, 0x0a, 0x07, 0x43, + 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x74, + 0x73, 0x64, 0x62, 0x2e, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x46, + 0x69, 0x65, 0x6c, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x43, 0x68, 0x61, 0x6e, + 0x67, 0x65, 0x73, 0x2a, 0x3c, 0x0a, 0x0a, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x17, 0x0a, 0x13, 0x41, 0x64, 0x64, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, + 0x65, 0x6e, 0x74, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x10, 0x00, 0x12, 0x15, 0x0a, 0x11, 0x44, 0x65, + 0x6c, 0x65, 0x74, 0x65, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x10, + 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x3b, 0x74, 0x73, 0x64, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -328,23 +501,30 @@ func file_internal_fieldsindex_proto_rawDescGZIP() []byte { return file_internal_fieldsindex_proto_rawDescData } -var file_internal_fieldsindex_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_internal_fieldsindex_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_internal_fieldsindex_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_internal_fieldsindex_proto_goTypes = []interface{}{ - (*Series)(nil), // 0: tsdb.Series - (*Tag)(nil), // 1: tsdb.Tag - (*MeasurementFields)(nil), // 2: tsdb.MeasurementFields - (*Field)(nil), // 3: tsdb.Field - (*MeasurementFieldSet)(nil), // 4: tsdb.MeasurementFieldSet + (ChangeType)(0), // 0: tsdb.ChangeType + (*Series)(nil), // 1: tsdb.Series + (*Tag)(nil), // 2: tsdb.Tag + (*MeasurementFields)(nil), // 3: tsdb.MeasurementFields + (*Field)(nil), // 4: tsdb.Field + (*MeasurementFieldSet)(nil), // 5: tsdb.MeasurementFieldSet + (*MeasurementFieldChange)(nil), // 6: tsdb.MeasurementFieldChange + (*FieldChangeSet)(nil), // 7: tsdb.FieldChangeSet } var file_internal_fieldsindex_proto_depIdxs = []int32{ - 1, // 0: tsdb.Series.Tags:type_name -> tsdb.Tag - 3, // 1: tsdb.MeasurementFields.Fields:type_name -> tsdb.Field - 2, // 2: tsdb.MeasurementFieldSet.Measurements:type_name -> tsdb.MeasurementFields - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 2, // 0: tsdb.Series.Tags:type_name -> tsdb.Tag + 4, // 1: tsdb.MeasurementFields.Fields:type_name -> tsdb.Field + 3, // 2: tsdb.MeasurementFieldSet.Measurements:type_name -> tsdb.MeasurementFields + 4, // 3: tsdb.MeasurementFieldChange.Field:type_name -> tsdb.Field + 0, // 4: tsdb.MeasurementFieldChange.Change:type_name -> tsdb.ChangeType + 6, // 5: tsdb.FieldChangeSet.Changes:type_name -> tsdb.MeasurementFieldChange + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_internal_fieldsindex_proto_init() } @@ -413,19 +593,44 @@ func file_internal_fieldsindex_proto_init() { return nil } } + file_internal_fieldsindex_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MeasurementFieldChange); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_fieldsindex_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FieldChangeSet); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_fieldsindex_proto_rawDesc, - NumEnums: 0, - NumMessages: 5, + NumEnums: 1, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, GoTypes: file_internal_fieldsindex_proto_goTypes, DependencyIndexes: file_internal_fieldsindex_proto_depIdxs, + EnumInfos: file_internal_fieldsindex_proto_enumTypes, MessageInfos: file_internal_fieldsindex_proto_msgTypes, }.Build() File_internal_fieldsindex_proto = out.File diff --git a/tsdb/internal/fieldsindex.proto b/tsdb/internal/fieldsindex.proto index 6f2b962f1c..2e4c80cf0b 100644 --- a/tsdb/internal/fieldsindex.proto +++ b/tsdb/internal/fieldsindex.proto @@ -32,3 +32,18 @@ message Field { message MeasurementFieldSet { repeated MeasurementFields Measurements = 1; } + +enum ChangeType { + AddMeasurementField = 0; + DeleteMeasurement = 1; +} + +message MeasurementFieldChange { + bytes Measurement = 1; + Field Field = 2; + ChangeType Change = 3; +} + +message FieldChangeSet { + repeated MeasurementFieldChange Changes = 1; +} \ No newline at end of file diff --git a/tsdb/shard.go b/tsdb/shard.go index 2702888a53..c7427db41e 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -3,6 +3,7 @@ package tsdb import ( "bytes" "context" + "encoding/binary" "errors" "fmt" "io" @@ -19,8 +20,10 @@ import ( "unsafe" "github.com/influxdata/influxdb/v2/influxql/query" + "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/bytesutil" + errors2 "github.com/influxdata/influxdb/v2/pkg/errors" "github.com/influxdata/influxdb/v2/pkg/estimator" "github.com/influxdata/influxdb/v2/pkg/file" "github.com/influxdata/influxdb/v2/pkg/limiter" @@ -35,10 +38,11 @@ import ( const ( measurementKey = "_name" DefaultMetricInterval = 10 * time.Second + FieldsChangeFile = "fields.idxl" + bytesInInt64 = 8 ) var ( - // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") @@ -870,14 +874,19 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error } // add fields + changes := make([]*FieldChange, 0, len(fieldsToCreate)) for _, f := range fieldsToCreate { mf := engine.MeasurementFields(f.Measurement) if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { return err } + changes = append(changes, &FieldChange{ + FieldCreate: *f, + ChangeType: AddMeasurementField, + }) } - return engine.MeasurementFieldSet().Save() + return engine.MeasurementFieldSet().Save(changes) } // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) @@ -1917,33 +1926,65 @@ func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataT } } +type FieldChanges []*FieldChange + +func MeasurementsToFieldChangeDeletions(measurements []string) FieldChanges { + fcs := make([]*FieldChange, 0, len(measurements)) + for _, m := range measurements { + fcs = append(fcs, &FieldChange{ + FieldCreate: FieldCreate{ + Measurement: []byte(m), + Field: nil, + }, + ChangeType: DeleteMeasurement, + }) + } + return fcs +} + // MeasurementFieldSet represents a collection of fields by measurement. // This safe for concurrent use. type MeasurementFieldSet struct { mu sync.RWMutex fields map[string]*MeasurementFields // path is the location to persist field sets - path string - writer *MeasurementFieldSetWriter + path string + changeMgr *measurementFieldSetChangeMgr } // NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. -func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) { +func NewMeasurementFieldSet(path string, logger *zap.Logger) (*MeasurementFieldSet, error) { const MaxCombinedWrites = 100 fs := &MeasurementFieldSet{ fields: make(map[string]*MeasurementFields), path: path, } - fs.SetMeasurementFieldSetWriter(MaxCombinedWrites) + if nil == logger { + logger = zap.NewNop() + } + fs.SetMeasurementFieldSetWriter(MaxCombinedWrites, logger) // If there is a load error, return the error and an empty set so // it can be rebuild manually. return fs, fs.load() } -func (fs *MeasurementFieldSet) Close() { - if fs != nil && fs.writer != nil { - fs.writer.Close() +func (fs *MeasurementFieldSet) Close() error { + if fs != nil && fs.changeMgr != nil { + fs.changeMgr.Close() + // If there is a change log file, save the in-memory version + if _, err := os.Stat(fs.changeMgr.changeFilePath); err == nil { + return fs.WriteToFile() + } else if os.IsNotExist(err) { + return nil + } else { + return fmt.Errorf("cannot get file information for %s: %w", fs.changeMgr.changeFilePath, err) + } } + return nil +} + +func (fs *MeasurementFieldSet) ChangesPath() string { + return fs.changeMgr.changeFilePath } // Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes. @@ -2014,7 +2055,7 @@ func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *Measurement // Delete removes a field set for a measurement. func (fs *MeasurementFieldSet) Delete(name string) { fs.mu.Lock() - delete(fs.fields, name) + fs.deleteNoLock(name) fs.mu.Unlock() } @@ -2027,10 +2068,15 @@ func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) erro return err } - delete(fs.fields, name) + fs.deleteNoLock(name) return nil } +// deleteNoLock removes a field set for a measurement +func (fs *MeasurementFieldSet) deleteNoLock(name string) { + delete(fs.fields, name) +} + func (fs *MeasurementFieldSet) IsEmpty() bool { fs.mu.RLock() defer fs.mu.RUnlock() @@ -2040,154 +2086,247 @@ func (fs *MeasurementFieldSet) IsEmpty() bool { type errorChannel chan<- error type writeRequest struct { - done errorChannel + errorReturn chan<- error + changes FieldChanges } -type MeasurementFieldSetWriter struct { - wg sync.WaitGroup - writeRequests chan writeRequest +type measurementFieldSetChangeMgr struct { + mu sync.Mutex + wg sync.WaitGroup + writeRequests chan writeRequest + changeFilePath string + logger *zap.Logger + changeFileSize int64 } // SetMeasurementFieldSetWriter - initialize the queue for write requests // and start the background write process -func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int) { +func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int, logger *zap.Logger) { fs.mu.Lock() defer fs.mu.Unlock() - fs.writer = &MeasurementFieldSetWriter{writeRequests: make(chan writeRequest, queueLength)} - fs.writer.wg.Add(1) - go fs.saveWriter() + fs.changeMgr = &measurementFieldSetChangeMgr{ + writeRequests: make(chan writeRequest, queueLength), + changeFilePath: filepath.Join(filepath.Dir(fs.path), FieldsChangeFile), + logger: logger, + changeFileSize: int64(0), + } + fs.changeMgr.wg.Add(1) + go fs.changeMgr.SaveWriter() } -func (w *MeasurementFieldSetWriter) Close() { - if w != nil { - close(w.writeRequests) - w.wg.Wait() +func (fscm *measurementFieldSetChangeMgr) Close() { + if fscm != nil { + close(fscm.writeRequests) + fscm.wg.Wait() } } -func (fs *MeasurementFieldSet) Save() error { - return fs.writer.RequestSave() +func (fs *MeasurementFieldSet) Save(changes FieldChanges) error { + return fs.changeMgr.RequestSave(changes) } -func (w *MeasurementFieldSetWriter) RequestSave() error { +func (fscm *measurementFieldSetChangeMgr) RequestSave(changes FieldChanges) error { done := make(chan error) - wr := writeRequest{done: done} - w.writeRequests <- wr + fscm.writeRequests <- writeRequest{errorReturn: done, changes: changes} return <-done } -func (fs *MeasurementFieldSet) saveWriter() { - defer fs.writer.wg.Done() - // Block until someone modifies the MeasurementFieldSet and - // it needs to be written to disk. - for req, ok := <-fs.writer.writeRequests; ok; req, ok = <-fs.writer.writeRequests { - fs.writeToFile(req) +func (fscm *measurementFieldSetChangeMgr) SaveWriter() { + defer fscm.wg.Done() + // Block until someone modifies the MeasurementFieldSet, and + // it needs to be written to disk. Exit when the channel is closed + for wr, ok := <-fscm.writeRequests; ok; wr, ok = <-fscm.writeRequests { + fscm.appendToChangesFile(wr) } } -// writeToFile: Write the new index to a temp file and rename when it's sync'd -func (fs *MeasurementFieldSet) writeToFile(first writeRequest) { - var err error - // Put the errorChannel on which we blocked into a slice to allow more invocations - // to share the return code from the file write - errorChannels := []errorChannel{first.done} - defer func() { - for _, c := range errorChannels { - c <- err - close(c) - } - }() - // Do some blocking IO operations before marshalling the in-memory index - // to allow other changes to it to be queued up and be captured in one - // write operation, in case we are under heavy field creation load +// WriteToFile: Write the new index to a temp file and rename when it's sync'd +// This locks the MeasurementFieldSet during the marshaling, the write, and the rename. +func (fs *MeasurementFieldSet) WriteToFile() error { path := fs.path + ".tmp" // Open the temp file fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) if err != nil { - return + return fmt.Errorf("failed opening %s: %w", fs.path, err) } // Ensure temp file is cleaned up defer func() { - if e := os.RemoveAll(path); err == nil { - err = e + if e := os.RemoveAll(path); err == nil && e != nil { + err = fmt.Errorf("failed removing temporary file %s: %w", path, e) + } + if e := os.RemoveAll(fs.changeMgr.changeFilePath); err == nil && e != nil { + err = fmt.Errorf("failed removing saved field changes - %s: %w", fs.changeMgr.changeFilePath, e) } }() + fs.mu.RLock() + defer fs.mu.RUnlock() isEmpty, err := func() (isEmpty bool, err error) { // ensure temp file closed before rename (for Windows) defer func() { - if e := fd.Close(); err == nil { - err = e + if e := fd.Close(); err == nil && e != nil { + err = fmt.Errorf("closing %s: %w", path, e) } }() if _, err = fd.Write(fieldsIndexMagicNumber); err != nil { - return true, err + return true, fmt.Errorf("failed writing magic number for %s: %w", path, err) } - // Read all the pending new field and measurement write requests - // that will be captured in the marshaling of the in-memory copy - for { - select { - case ec := <-fs.writer.writeRequests: - errorChannels = append(errorChannels, ec.done) - continue - default: - } - break - } // Lock, copy, and marshal the in-memory index - b, err := fs.marshalMeasurementFieldSet() + b, err := fs.marshalMeasurementFieldSetNoLock() if err != nil { - return true, err + return true, fmt.Errorf("failed marshaling fields for %s: %w", fs.path, err) } if b == nil { // No fields, file removed, all done return true, nil } if _, err := fd.Write(b); err != nil { - return true, err + return true, fmt.Errorf("failed saving fields to %s: %w", path, err) } - return false, fd.Sync() + return false, nil }() - if err != nil || isEmpty { + if err != nil { + return err + } else if isEmpty { + // remove empty file + if err = os.RemoveAll(fs.path); err != nil { + return fmt.Errorf("cannot remove %s: %w", fs.path, err) + } else { + return nil + } + } + + return fs.renameFileNoLock(path) +} + +// appendToChangesFile: Write a change file for fields.idx +// Only called in one Go proc, so does not need locking. +func (fscm *measurementFieldSetChangeMgr) appendToChangesFile(first writeRequest) { + var err error = nil + // Put the errorChannel on which we blocked into a slice to allow more invocations + // to share the return code from the file write + errorChannels := []errorChannel{first.errorReturn} + changes := []FieldChanges{first.changes} + // On return, send the error to every go proc that send changes + defer func() { + for _, c := range errorChannels { + c <- err + close(c) + } + }() + log, end := logger.NewOperation(context.TODO(), fscm.logger, "saving field index changes", "MeasurementFieldSet") + defer end() + // Do some blocking IO operations before marshalling the changes, + // to allow other changes to be queued up and be captured in one + // write operation, in case we are under heavy field creation load + fscm.mu.Lock() + defer fscm.mu.Unlock() + fd, err := os.OpenFile(fscm.changeFilePath, os.O_CREATE|os.O_APPEND|os.O_SYNC|os.O_WRONLY, 0666) + if err != nil { + err = fmt.Errorf("opening %s: %w", fscm.changeFilePath, err) + log.Error("failed", zap.Error(err)) return } - err = fs.renameFile(path) -} -// marshalMeasurementFieldSet: remove the fields.idx file if no fields -// otherwise, copy the in-memory version into a protobuf to write to -// disk -func (fs *MeasurementFieldSet) marshalMeasurementFieldSet() ([]byte, error) { - fs.mu.Lock() - defer fs.mu.Unlock() - if len(fs.fields) == 0 { - // If no fields left, remove the fields index file - if err := os.RemoveAll(fs.path); err != nil { - return nil, err + // ensure file closed + defer errors2.Capture(&err, func() error { + if e := fd.Close(); e != nil { + e = fmt.Errorf("closing %s: %w", fd.Name(), e) + log.Error("failed", zap.Error(e)) + return e } else { - return nil, nil + return nil + } + })() + + var fi os.FileInfo + if fi, err = fd.Stat(); err != nil { + err = fmt.Errorf("unable to get size of %s: %w", fd.Name(), err) + log.Error("failed", zap.Error(err)) + return + } else if fi.Size() > fscm.changeFileSize { + // If we had a partial write last time, truncate the file to remove it. + if err = fd.Truncate(fscm.changeFileSize); err != nil { + err = fmt.Errorf("cannot truncate %s to last known good size of %d after incomplete write: %w", fd.Name(), fscm.changeFileSize, err) + log.Error("failed", zap.Error(err)) + return } } - return fs.marshalMeasurementFieldSetNoLock() -} -func (fs *MeasurementFieldSet) renameFile(path string) error { - fs.mu.Lock() - defer fs.mu.Unlock() - - if err := file.RenameFile(path, fs.path); err != nil { - return err + // Read all the pending field and measurement write or delete + // requests + for { + select { + case wr := <-fscm.writeRequests: + changes = append(changes, wr.changes) + errorChannels = append(errorChannels, wr.errorReturn) + continue + default: + } + break + } + // marshal the slice of slices of field changes in size-prefixed protobuf + var b []byte + b, err = marshalFieldChanges(changes...) + if err != nil { + err = fmt.Errorf("error marshaling changes for %s: %w", fd.Name(), err) + log.Error("failed", zap.Error(err)) + return } - if err := file.SyncDir(filepath.Dir(fs.path)); err != nil { - return err + if _, err = fd.Write(b); err != nil { + err = fmt.Errorf("failed writing to %s: %w", fd.Name(), err) + log.Error("failed", zap.Error(err)) + return + } else if fi, err = fd.Stat(); err != nil { + err = fmt.Errorf("unable to get final size of %s after appendation: %w", fd.Name(), err) + log.Error("failed", zap.Error(err)) + return + } else { + fscm.changeFileSize = fi.Size() + } +} + +func readSizePlusBuffer(r io.Reader, b []byte) ([]byte, error) { + var numBuf [bytesInInt64]byte + + if _, err := r.Read(numBuf[:]); err != nil { + return nil, err + } + size := int(binary.LittleEndian.Uint64(numBuf[:])) + if cap(b) < size { + b = make([]byte, size) + } + _, err := io.ReadAtLeast(r, b, size) + if err != nil { + return nil, err + } + return b, nil +} + +func (fs *MeasurementFieldSet) renameFileNoLock(path string) error { + if err := file.RenameFile(path, fs.path); err != nil { + return fmt.Errorf("cannot rename %s to %s: %w", path, fs.path, err) + } + + dir := filepath.Dir(fs.path) + if err := file.SyncDir(dir); err != nil { + return fmt.Errorf("cannot sync directory %s: %w", dir, err) } return nil } +// marshalMeasurementFieldSetNoLock: remove the fields.idx file if no fields +// otherwise, copy the in-memory version into a protobuf to write to +// disk func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []byte, err error) { + if len(fs.fields) == 0 { + // If no fields left, remove the fields index file + return nil, nil + } + pb := internal.MeasurementFieldSet{ Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)), } @@ -2213,48 +2352,202 @@ func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled [] } } -func (fs *MeasurementFieldSet) load() error { - fs.mu.Lock() - defer fs.mu.Unlock() +func marshalFieldChanges(changeSet ...FieldChanges) ([]byte, error) { + fcs := internal.FieldChangeSet{ + Changes: nil, + } + for _, fc := range changeSet { + for _, f := range fc { + mfc := &internal.MeasurementFieldChange{ + Measurement: f.Measurement, + Change: internal.ChangeType(f.ChangeType), + } + if f.Field != nil { + mfc.Field = &internal.Field{ + Name: []byte(f.Field.Name), + Type: int32(f.Field.Type), + } + fcs.Changes = append(fcs.Changes, mfc) + } + } + } + mo := proto.MarshalOptions{} + var numBuf [bytesInInt64]byte + + b, err := mo.MarshalAppend(numBuf[:], &fcs) + binary.LittleEndian.PutUint64(b[0:bytesInInt64], uint64(len(b)-bytesInInt64)) + + if err != nil { + fields := make([]string, 0, len(fcs.Changes)) + for _, fc := range changeSet { + for _, f := range fc { + fields = append(fields, fmt.Sprintf("%q.%q", f.Measurement, f.Field.Name)) + } + } + return nil, fmt.Errorf("failed marshaling new fields - %s: %w", strings.Join(fields, ", "), err) + } + return b, nil +} + +func (fs *MeasurementFieldSet) load() (rErr error) { + err := func() error { + fs.mu.Lock() + defer fs.mu.Unlock() + + pb, err := fs.loadParseFieldIndexPB() + if err != nil { + return err + } + fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) + for _, measurement := range pb.GetMeasurements() { + fields := make(map[string]*Field, len(measurement.GetFields())) + for _, field := range measurement.GetFields() { + fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())} + } + set := &MeasurementFields{} + set.fields.Store(fields) + fs.fields[string(measurement.GetName())] = set + } + return nil + }() + + if err != nil { + return fmt.Errorf("failed loading field indices: %w", err) + } + return fs.ApplyChanges() +} + +func (fs *MeasurementFieldSet) loadParseFieldIndexPB() (pb *internal.MeasurementFieldSet, rErr error) { + pb = &internal.MeasurementFieldSet{} fd, err := os.Open(fs.path) if os.IsNotExist(err) { - return nil + return pb, nil } else if err != nil { - return err + err = fmt.Errorf("failed opening %s: %w", fs.path, err) + return nil, err } - defer fd.Close() + + defer errors2.Capture(&rErr, func() error { + if e := fd.Close(); e != nil { + return fmt.Errorf("failed closing %s: %w", fd.Name(), e) + } else { + return nil + } + })() var magic [4]byte if _, err := fd.Read(magic[:]); err != nil { - return err + err = fmt.Errorf("failed reading %s: %w", fs.path, err) + return nil, err } if !bytes.Equal(magic[:], fieldsIndexMagicNumber) { - return ErrUnknownFieldsFormat + return nil, fmt.Errorf("%q: %w", fs.path, ErrUnknownFieldsFormat) } - var pb internal.MeasurementFieldSet b, err := io.ReadAll(fd) + if err != nil { + err = fmt.Errorf("failed reading %s: %w", fs.path, err) + return nil, err + } + if err = proto.Unmarshal(b, pb); err != nil { + err = fmt.Errorf("failed unmarshaling %s: %w", fs.path, err) + return nil, err + } + return pb, err +} + +func (fscm *measurementFieldSetChangeMgr) loadAllFieldChanges(log *zap.Logger) (changes []FieldChanges, rErr error) { + var fcs FieldChanges + + fscm.mu.Lock() + defer fscm.mu.Unlock() + fd, err := os.Open(fscm.changeFilePath) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + err = fmt.Errorf("failed opening %s: %w", fscm.changeFilePath, err) + log.Error("field index file of changes", zap.Error(err)) + return nil, err + } + defer errors2.Capture(&rErr, func() error { + if e := fd.Close(); e != nil { + return fmt.Errorf("failed closing %s: %w", fd.Name(), e) + } else { + return nil + } + })() + for fcs, err = fscm.loadFieldChangeSet(fd); err == nil; fcs, err = fscm.loadFieldChangeSet(fd) { + changes = append(changes, fcs) + } + if errors.Is(err, io.EOF) { + return changes, nil + } else if errors.Is(err, io.ErrUnexpectedEOF) { + log.Warn("last entry was an incomplete write", zap.Error(err)) + return changes, nil + } else { + log.Error("field index file of changes", zap.Error(err)) + return nil, err + } +} + +func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (FieldChanges, error) { + var pb internal.FieldChangeSet + + b, err := readSizePlusBuffer(r, nil) + if err != nil { + return nil, fmt.Errorf("failed reading %s: %w", fscm.changeFilePath, err) + } + if err := proto.Unmarshal(b, &pb); err != nil { + return nil, fmt.Errorf("failed unmarshalling %s: %w", fscm.changeFilePath, err) + } + + fcs := make([]*FieldChange, 0, len(pb.Changes)) + + for _, fc := range pb.Changes { + fcs = append(fcs, &FieldChange{ + FieldCreate: FieldCreate{ + Measurement: fc.Measurement, + Field: &Field{ + ID: 0, + Name: string(fc.Field.Name), + Type: influxql.DataType(fc.Field.Type), + }, + }, + ChangeType: ChangeType(fc.Change), + }) + } + return fcs, nil +} + +func (fs *MeasurementFieldSet) ApplyChanges() error { + log, end := logger.NewOperation(context.TODO(), fs.changeMgr.logger, "failed loading changes", "field indices") + defer end() + changes, err := fs.changeMgr.loadAllFieldChanges(log) if err != nil { return err } - - if err := proto.Unmarshal(b, &pb); err != nil { - return err + if len(changes) <= 0 { + return os.RemoveAll(fs.changeMgr.changeFilePath) } - fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) - for _, measurement := range pb.GetMeasurements() { - fields := make(map[string]*Field, len(measurement.GetFields())) - for _, field := range measurement.GetFields() { - fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())} + for _, fcs := range changes { + for _, fc := range fcs { + if fc.ChangeType == DeleteMeasurement { + fs.Delete(string(fc.Measurement)) + } else { + mf := fs.CreateFieldsIfNotExists(fc.Measurement) + if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil { + err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err) + log.Error("field creation", zap.Error(err)) + return err + } + } } - set := &MeasurementFields{} - set.fields.Store(fields) - fs.fields[string(measurement.GetName())] = set } - return nil + return fs.WriteToFile() } // Field represents a series field. All of the fields must be hashable. @@ -2264,6 +2557,18 @@ type Field struct { Type influxql.DataType `json:"type,omitempty"` } +type FieldChange struct { + FieldCreate + ChangeType ChangeType +} + +type ChangeType int + +const ( + AddMeasurementField = ChangeType(internal.ChangeType_AddMeasurementField) + DeleteMeasurement = ChangeType(internal.ChangeType_DeleteMeasurement) +) + // NewFieldKeysIterator returns an iterator that can be iterated over to // retrieve field keys. func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index ad1105a4ee..98810bdb46 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -3,7 +3,11 @@ package tsdb_test import ( "bytes" "context" + "errors" "fmt" + "github.com/davecgh/go-spew/spew" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "math" "os" "path/filepath" @@ -16,18 +20,16 @@ import ( "testing" "time" - "github.com/davecgh/go-spew/spew" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/internal" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/deep" + "github.com/influxdata/influxdb/v2/pkg/testing/assert" "github.com/influxdata/influxdb/v2/tsdb" _ "github.com/influxdata/influxdb/v2/tsdb/engine" _ "github.com/influxdata/influxdb/v2/tsdb/index" "github.com/influxdata/influxql" - "github.com/stretchr/testify/assert" + assert2 "github.com/stretchr/testify/assert" ) func TestShardWriteAndIndex(t *testing.T) { @@ -1518,31 +1520,48 @@ _reserved,region=uswest value="foo" 0 } func TestMeasurementFieldSet_SaveLoad(t *testing.T) { + const measurement = "cpu" + const fieldName = "value" + dir, cleanup := MustTempDir() defer cleanup() path := filepath.Join(dir, "fields.idx") - mf, err := tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mf.Close() - fields := mf.CreateFieldsIfNotExists([]byte("cpu")) - if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { + defer checkMeasurementFieldSetClose(t, mf) + fields := mf.CreateFieldsIfNotExists([]byte(measurement)) + if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } - - if err := mf.Save(); err != nil { - t.Fatalf("save error: %v", err) + change := tsdb.FieldChange{ + FieldCreate: tsdb.FieldCreate{ + Measurement: []byte(measurement), + Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + }, + ChangeType: tsdb.AddMeasurementField, } - mf2, err := tsdb.NewMeasurementFieldSet(path) + if err := mf.Save(tsdb.FieldChanges{&change}); err != nil { + t.Fatalf("save error: %v", err) + } + _, err = os.Stat(mf.ChangesPath()) + assert.NoError(t, err, "no field.idx change file") + + mf2, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mf2.Close() - fields = mf2.FieldsByString("cpu") - field := fields.Field("value") + _, err = os.Stat(mf.ChangesPath()) + assert2.Error(t, err, "file %s should have had this error: %s", mf.ChangesPath(), os.ErrNotExist) + if !os.IsNotExist(err) { + t.Fatalf("unexpected error for %s: got %s, expected %s", mf.ChangesPath(), err, os.ErrNotExist) + } + defer checkMeasurementFieldSetClose(t, mf2) + fields = mf2.FieldsByString(measurement) + field := fields.Field(fieldName) if field == nil { t.Fatalf("field is null") } @@ -1558,17 +1577,26 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) { path := filepath.Join(dir, "fields.idx") func() { - mf, err := tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mf.Close() - fields := mf.CreateFieldsIfNotExists([]byte("cpu")) - if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { + defer checkMeasurementFieldSetClose(t, mf) + measurement := []byte("cpu") + fields := mf.CreateFieldsIfNotExists(measurement) + fieldName := "value" + if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } + change := tsdb.FieldChange{ + FieldCreate: tsdb.FieldCreate{ + Measurement: []byte(measurement), + Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + }, + ChangeType: tsdb.AddMeasurementField, + } - if err := mf.Save(); err != nil { + if err := mf.Save(tsdb.FieldChanges{&change}); err != nil { t.Fatalf("save error: %v", err) } }() @@ -1580,42 +1608,137 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) { if err := os.Truncate(path, stat.Size()-3); err != nil { t.Fatalf("truncate error: %v", err) } - mf, err := tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path, nil) if err == nil { t.Fatal("NewMeasurementFieldSet expected error") } - defer mf.Close() + defer checkMeasurementFieldSetClose(t, mf) fields := mf.FieldsByString("cpu") if fields != nil { t.Fatal("expecte fields to be nil") } } + +func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) { + dir, cleanup := MustTempDir() + defer cleanup() + + testFields := []struct { + Measurement string + Field string + FieldType influxql.DataType + }{ + { + Measurement: "cpu", + Field: "value_1", + FieldType: influxql.Float, + }, + { + Measurement: "cpu", + Field: "value_2", + FieldType: influxql.String, + }, + { + Measurement: "cpu", + Field: "value_3", + FieldType: influxql.Integer, + }, + } + + path := filepath.Join(dir, "fields.idx") + var mf *tsdb.MeasurementFieldSet + var err error + mf, err = tsdb.NewMeasurementFieldSet(path, nil) + if err != nil { + t.Fatalf("NewMeasurementFieldSet error: %v", err) + } + defer checkMeasurementFieldSetClose(t, mf) + for _, f := range testFields { + fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement)) + if err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil { + t.Fatalf("create field error: %v", err) + } + change := tsdb.FieldChange{ + FieldCreate: tsdb.FieldCreate{ + Measurement: []byte(f.Measurement), + Field: &tsdb.Field{ID: 0, Name: f.Field, Type: f.FieldType}, + }, + ChangeType: tsdb.AddMeasurementField, + } + + if err := mf.Save(tsdb.FieldChanges{&change}); err != nil { + t.Fatalf("save error: %v", err) + } + } + changeFile := filepath.Join(dir, tsdb.FieldsChangeFile) + stat, err := os.Stat(changeFile) + if err != nil { + t.Fatalf("stat error: %v", err) + } + // Truncate the file to simulate a corrupted file + if err := os.Truncate(changeFile, stat.Size()-3); err != nil { + t.Fatalf("truncate error: %v", err) + } + mf2, err := tsdb.NewMeasurementFieldSet(path, nil) + assert.NoError(t, err, "failed creating second MeasurementFieldSet") + defer checkMeasurementFieldSetClose(t, mf2) + + for i := 0; i < len(testFields)-1; i++ { + fields := mf2.FieldsByString(testFields[i].Measurement) + if fields == nil { + t.Fatalf("nil fields map for %s", testFields[i].Measurement) + } else if f := fields.Field(testFields[i].Field); f == nil { + t.Fatalf("%s not found in %s fields", testFields[i].Field, testFields[i].Measurement) + } else if f.Type != testFields[i].FieldType { + t.Fatalf("%s.%s wrong type: expected %v, got %v", testFields[i].Measurement, testFields[i].Field, testFields[i].FieldType, f.Type) + } + } + i := len(testFields) - 1 + fields := mf2.FieldsByString(testFields[i].Measurement) + if fields == nil { + t.Fatalf("nil fields map for %s", testFields[i].Measurement) + } else if f := fields.Field(testFields[i].Field); f != nil { + t.Fatalf("%s found in %s fields, should have not been present", testFields[i].Field, testFields[i].Measurement) + } +} + func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { + const measurement = "cpu" + const fieldName = "value" + dir, cleanup := MustTempDir() defer cleanup() path := filepath.Join(dir, "fields.idx") - mf, err := tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mf.Close() - fields := mf.CreateFieldsIfNotExists([]byte("cpu")) - if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil { + defer checkMeasurementFieldSetClose(t, mf) + + fields := mf.CreateFieldsIfNotExists([]byte(measurement)) + if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } - if err := mf.Save(); err != nil { + change := tsdb.FieldChange{ + FieldCreate: tsdb.FieldCreate{ + Measurement: []byte(measurement), + Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + }, + ChangeType: tsdb.AddMeasurementField, + } + + if err := mf.Save(tsdb.FieldChanges{&change}); err != nil { t.Fatalf("save error: %v", err) } - mf2, err := tsdb.NewMeasurementFieldSet(path) + mf2, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mf2.Close() - fields = mf2.FieldsByString("cpu") - field := fields.Field("value") + fields = mf2.FieldsByString(measurement) + field := fields.Field(fieldName) if field == nil { t.Fatalf("field is null") } @@ -1624,17 +1747,35 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { t.Fatalf("field type mismatch: got %v, exp %v", got, exp) } - mf2.Delete("cpu") + mf2.Delete(measurement) - if err := mf2.Save(); err != nil { + if err := mf2.Save(tsdb.MeasurementsToFieldChangeDeletions([]string{measurement})); err != nil { t.Fatalf("save after delete error: %v", err) } + _, err = os.Stat(mf.ChangesPath()) + assert.NoError(t, err, "no field.idx change file") + assert.NoError(t, mf2.Close(), "failed closing MeasurementFieldSet") - if _, err := os.Stat(path); !os.IsNotExist(err) { + _, err = os.Stat(mf.ChangesPath()) + assert2.Error(t, err, "file %s should have had this error: %s", mf.ChangesPath(), os.ErrNotExist) + if !os.IsNotExist(err) { + t.Fatalf("unexpected error for %s: got %s, expected %s", mf.ChangesPath(), err, os.ErrNotExist) + } + + if _, err = os.Stat(path); !os.IsNotExist(err) { t.Fatalf("got %v, not exist err", err) } } +func checkMeasurementFieldSetClose(t *testing.T, fs *tsdb.MeasurementFieldSet) { + assert.NoError(t, fs.Close(), "failed closing tsdb.MeasurementFieldSet") + _, err := os.Stat(fs.ChangesPath()) + assert2.Error(t, err, "file %s should have had this error: %s", fs.ChangesPath(), os.ErrNotExist) + if !os.IsNotExist(err) { + t.Fatalf("unexpected error for %s: got %s, expected %s", fs.ChangesPath(), err, os.ErrNotExist) + } +} + func TestMeasurementFieldSet_InvalidFormat(t *testing.T) { dir, cleanup := MustTempDir() defer cleanup() @@ -1645,11 +1786,11 @@ func TestMeasurementFieldSet_InvalidFormat(t *testing.T) { t.Fatalf("error writing fields.index: %v", err) } - mf, err := tsdb.NewMeasurementFieldSet(path) - if err != tsdb.ErrUnknownFieldsFormat { + mf, err := tsdb.NewMeasurementFieldSet(path, nil) + if !errors.Is(err, tsdb.ErrUnknownFieldsFormat) { t.Fatalf("unexpected error: got %v, exp %v", err, tsdb.ErrUnknownFieldsFormat) } - defer mf.Close() + defer checkMeasurementFieldSetClose(t, mf) } func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { @@ -1673,11 +1814,11 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { } path := filepath.Join(dir, "fields.idx") - mfs, err := tsdb.NewMeasurementFieldSet(path) + mfs, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mfs.Close() + defer checkMeasurementFieldSetClose(t, mfs) var wg sync.WaitGroup wg.Add(len(ft)) @@ -1686,11 +1827,11 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { } wg.Wait() - mfs2, err := tsdb.NewMeasurementFieldSet(path) + mfs2, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } - defer mfs2.Close() + defer checkMeasurementFieldSetClose(t, mfs2) for i, fs := range ft { mf := mfs.Fields([]byte(mt[i])) mf2 := mfs2.Fields([]byte(mt[i])) @@ -1708,7 +1849,7 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) { func TestMeasurementFieldSet_MeasurementNames(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "fields.idx") - mf, err := tsdb.NewMeasurementFieldSet(path) + mf, err := tsdb.NewMeasurementFieldSet(path, nil) if err != nil { t.Fatalf("NewMeasurementFieldSet error: %v", err) } @@ -1726,15 +1867,31 @@ func TestMeasurementFieldSet_MeasurementNames(t *testing.T) { func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldSet, measurement string, fieldNames []string) { defer wg.Done() fields := mf.CreateFieldsIfNotExists([]byte(measurement)) + for _, fieldName := range fieldNames { if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Errorf("create field error: %v", err) return } - if err := mf.Save(); err != nil { - t.Errorf("save error: %v", err) + change := tsdb.FieldChange{ + FieldCreate: tsdb.FieldCreate{ + Measurement: []byte(measurement), + Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + }, + ChangeType: tsdb.AddMeasurementField, + } + + err := mf.Save(tsdb.FieldChanges{&change}) + if err != nil { + t.Logf("save error: %v", err) + t.Fail() return } + _, err = os.Stat(mf.ChangesPath()) + if err != nil { + t.Logf("unexpected error for field.idxl change file %s: %s", mf.ChangesPath(), err) + t.Fail() + } } }