feat: optimize saving changes to fields.idx (#23701) (#23728)

Instead of writing out the complete fields.idx
file when it changes, write out incremental
changes that will be applied to the file on
close and startup.

closes https://github.com/influxdata/influxdb/issues/23653

(cherry picked from commit 80c10c8c04)

closes https://github.com/influxdata/influxdb/issues/23703
pull/23733/head
davidby-influx 2022-09-15 12:15:14 -07:00 committed by GitHub
parent 91623ddc21
commit b72848d436
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 880 additions and 194 deletions

View File

@ -719,7 +719,7 @@ func (e *Engine) Open(ctx context.Context) error {
return err
}
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"))
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"), e.logger)
if err != nil {
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
}
@ -763,15 +763,18 @@ func (e *Engine) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
e.done = nil // Ensures that the channel will not be closed again.
e.fieldset.Close()
if err := e.FileStore.Close(); err != nil {
return err
var err error = nil
err = e.fieldset.Close()
if err2 := e.FileStore.Close(); err2 != nil && err == nil {
err = err2
}
if e.WALEnabled {
return e.WAL.Close()
if err2 := e.WAL.Close(); err2 != nil && err == nil {
err = err2
}
}
return nil
return err
}
// WithLogger sets the logger for the engine.
@ -868,7 +871,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
}
// Save the field set index so we don't have to rebuild it next time
if err := e.fieldset.Save(); err != nil {
if err := e.fieldset.WriteToFile(); err != nil {
return err
}
@ -1170,7 +1173,7 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
return err
}
}
return nil
return e.MeasurementFieldSet().WriteToFile()
}
// readFileFromBackup copies the next file from the archive into the shard.
@ -1703,19 +1706,20 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min
ids.Add(sid)
}
fielsetChanged := false
actuallyDeleted := make([]string, 0, len(measurements))
for k := range measurements {
if dropped, err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil {
return err
} else if dropped {
if err := e.cleanupMeasurement([]byte(k)); err != nil {
if deleted, err := e.cleanupMeasurement([]byte(k)); err != nil {
return err
} else if deleted {
actuallyDeleted = append(actuallyDeleted, k)
}
fielsetChanged = true
}
}
if fielsetChanged {
if err := e.fieldset.Save(); err != nil {
if len(actuallyDeleted) > 0 {
if err := e.fieldset.Save(tsdb.MeasurementsToFieldChangeDeletions(actuallyDeleted)); err != nil {
return err
}
}
@ -1745,7 +1749,7 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min
return nil
}
func (e *Engine) cleanupMeasurement(name []byte) error {
func (e *Engine) cleanupMeasurement(name []byte) (deleted bool, err error) {
// A sentinel error message to cause DeleteWithLock to not delete the measurement
abortErr := fmt.Errorf("measurements still exist")
@ -1776,10 +1780,10 @@ func (e *Engine) cleanupMeasurement(name []byte) error {
}); err != nil && err != abortErr {
// Something else failed, return it
return err
return false, err
}
return nil
return err != abortErr, nil
}
// DeleteMeasurement deletes a measurement and all related series.

View File

@ -20,6 +20,52 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ChangeType int32
const (
ChangeType_AddMeasurementField ChangeType = 0
ChangeType_DeleteMeasurement ChangeType = 1
)
// Enum value maps for ChangeType.
var (
ChangeType_name = map[int32]string{
0: "AddMeasurementField",
1: "DeleteMeasurement",
}
ChangeType_value = map[string]int32{
"AddMeasurementField": 0,
"DeleteMeasurement": 1,
}
)
func (x ChangeType) Enum() *ChangeType {
p := new(ChangeType)
*p = x
return p
}
func (x ChangeType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (ChangeType) Descriptor() protoreflect.EnumDescriptor {
return file_internal_fieldsindex_proto_enumTypes[0].Descriptor()
}
func (ChangeType) Type() protoreflect.EnumType {
return &file_internal_fieldsindex_proto_enumTypes[0]
}
func (x ChangeType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use ChangeType.Descriptor instead.
func (ChangeType) EnumDescriptor() ([]byte, []int) {
return file_internal_fieldsindex_proto_rawDescGZIP(), []int{0}
}
type Series struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -287,6 +333,116 @@ func (x *MeasurementFieldSet) GetMeasurements() []*MeasurementFields {
return nil
}
type MeasurementFieldChange struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Measurement []byte `protobuf:"bytes,1,opt,name=Measurement,proto3" json:"Measurement,omitempty"`
Field *Field `protobuf:"bytes,2,opt,name=Field,proto3" json:"Field,omitempty"`
Change ChangeType `protobuf:"varint,3,opt,name=Change,proto3,enum=tsdb.ChangeType" json:"Change,omitempty"`
}
func (x *MeasurementFieldChange) Reset() {
*x = MeasurementFieldChange{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_fieldsindex_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *MeasurementFieldChange) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*MeasurementFieldChange) ProtoMessage() {}
func (x *MeasurementFieldChange) ProtoReflect() protoreflect.Message {
mi := &file_internal_fieldsindex_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use MeasurementFieldChange.ProtoReflect.Descriptor instead.
func (*MeasurementFieldChange) Descriptor() ([]byte, []int) {
return file_internal_fieldsindex_proto_rawDescGZIP(), []int{5}
}
func (x *MeasurementFieldChange) GetMeasurement() []byte {
if x != nil {
return x.Measurement
}
return nil
}
func (x *MeasurementFieldChange) GetField() *Field {
if x != nil {
return x.Field
}
return nil
}
func (x *MeasurementFieldChange) GetChange() ChangeType {
if x != nil {
return x.Change
}
return ChangeType_AddMeasurementField
}
type FieldChangeSet struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Changes []*MeasurementFieldChange `protobuf:"bytes,1,rep,name=Changes,proto3" json:"Changes,omitempty"`
}
func (x *FieldChangeSet) Reset() {
*x = FieldChangeSet{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_fieldsindex_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FieldChangeSet) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FieldChangeSet) ProtoMessage() {}
func (x *FieldChangeSet) ProtoReflect() protoreflect.Message {
mi := &file_internal_fieldsindex_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FieldChangeSet.ProtoReflect.Descriptor instead.
func (*FieldChangeSet) Descriptor() ([]byte, []int) {
return file_internal_fieldsindex_proto_rawDescGZIP(), []int{6}
}
func (x *FieldChangeSet) GetChanges() []*MeasurementFieldChange {
if x != nil {
return x.Changes
}
return nil
}
var File_internal_fieldsindex_proto protoreflect.FileDescriptor
var file_internal_fieldsindex_proto_rawDesc = []byte{
@ -311,9 +467,26 @@ var file_internal_fieldsindex_proto_rawDesc = []byte{
0x65, 0x74, 0x12, 0x3b, 0x0a, 0x0c, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e,
0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x74, 0x73, 0x64, 0x62, 0x2e,
0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x46, 0x69, 0x65, 0x6c, 0x64,
0x73, 0x52, 0x0c, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x42,
0x08, 0x5a, 0x06, 0x2e, 0x3b, 0x74, 0x73, 0x64, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
0x73, 0x52, 0x0c, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x22,
0x87, 0x01, 0x0a, 0x16, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x46,
0x69, 0x65, 0x6c, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x4d, 0x65,
0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x05,
0x46, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x74, 0x73,
0x64, 0x62, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x05, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12,
0x28, 0x0a, 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32,
0x10, 0x2e, 0x74, 0x73, 0x64, 0x62, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x54, 0x79, 0x70,
0x65, 0x52, 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x22, 0x48, 0x0a, 0x0e, 0x46, 0x69, 0x65,
0x6c, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, 0x36, 0x0a, 0x07, 0x43,
0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x74,
0x73, 0x64, 0x62, 0x2e, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x46,
0x69, 0x65, 0x6c, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x43, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x73, 0x2a, 0x3c, 0x0a, 0x0a, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x54, 0x79, 0x70,
0x65, 0x12, 0x17, 0x0a, 0x13, 0x41, 0x64, 0x64, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d,
0x65, 0x6e, 0x74, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x10, 0x00, 0x12, 0x15, 0x0a, 0x11, 0x44, 0x65,
0x6c, 0x65, 0x74, 0x65, 0x4d, 0x65, 0x61, 0x73, 0x75, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x10,
0x01, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x3b, 0x74, 0x73, 0x64, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (
@ -328,23 +501,30 @@ func file_internal_fieldsindex_proto_rawDescGZIP() []byte {
return file_internal_fieldsindex_proto_rawDescData
}
var file_internal_fieldsindex_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_internal_fieldsindex_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_internal_fieldsindex_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_internal_fieldsindex_proto_goTypes = []interface{}{
(*Series)(nil), // 0: tsdb.Series
(*Tag)(nil), // 1: tsdb.Tag
(*MeasurementFields)(nil), // 2: tsdb.MeasurementFields
(*Field)(nil), // 3: tsdb.Field
(*MeasurementFieldSet)(nil), // 4: tsdb.MeasurementFieldSet
(ChangeType)(0), // 0: tsdb.ChangeType
(*Series)(nil), // 1: tsdb.Series
(*Tag)(nil), // 2: tsdb.Tag
(*MeasurementFields)(nil), // 3: tsdb.MeasurementFields
(*Field)(nil), // 4: tsdb.Field
(*MeasurementFieldSet)(nil), // 5: tsdb.MeasurementFieldSet
(*MeasurementFieldChange)(nil), // 6: tsdb.MeasurementFieldChange
(*FieldChangeSet)(nil), // 7: tsdb.FieldChangeSet
}
var file_internal_fieldsindex_proto_depIdxs = []int32{
1, // 0: tsdb.Series.Tags:type_name -> tsdb.Tag
3, // 1: tsdb.MeasurementFields.Fields:type_name -> tsdb.Field
2, // 2: tsdb.MeasurementFieldSet.Measurements:type_name -> tsdb.MeasurementFields
3, // [3:3] is the sub-list for method output_type
3, // [3:3] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
2, // 0: tsdb.Series.Tags:type_name -> tsdb.Tag
4, // 1: tsdb.MeasurementFields.Fields:type_name -> tsdb.Field
3, // 2: tsdb.MeasurementFieldSet.Measurements:type_name -> tsdb.MeasurementFields
4, // 3: tsdb.MeasurementFieldChange.Field:type_name -> tsdb.Field
0, // 4: tsdb.MeasurementFieldChange.Change:type_name -> tsdb.ChangeType
6, // 5: tsdb.FieldChangeSet.Changes:type_name -> tsdb.MeasurementFieldChange
6, // [6:6] is the sub-list for method output_type
6, // [6:6] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
}
func init() { file_internal_fieldsindex_proto_init() }
@ -413,19 +593,44 @@ func file_internal_fieldsindex_proto_init() {
return nil
}
}
file_internal_fieldsindex_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*MeasurementFieldChange); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_fieldsindex_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FieldChangeSet); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_fieldsindex_proto_rawDesc,
NumEnums: 0,
NumMessages: 5,
NumEnums: 1,
NumMessages: 7,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_internal_fieldsindex_proto_goTypes,
DependencyIndexes: file_internal_fieldsindex_proto_depIdxs,
EnumInfos: file_internal_fieldsindex_proto_enumTypes,
MessageInfos: file_internal_fieldsindex_proto_msgTypes,
}.Build()
File_internal_fieldsindex_proto = out.File

View File

@ -32,3 +32,18 @@ message Field {
message MeasurementFieldSet {
repeated MeasurementFields Measurements = 1;
}
enum ChangeType {
AddMeasurementField = 0;
DeleteMeasurement = 1;
}
message MeasurementFieldChange {
bytes Measurement = 1;
Field Field = 2;
ChangeType Change = 3;
}
message FieldChangeSet {
repeated MeasurementFieldChange Changes = 1;
}

View File

@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
@ -19,8 +20,10 @@ import (
"unsafe"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
errors2 "github.com/influxdata/influxdb/v2/pkg/errors"
"github.com/influxdata/influxdb/v2/pkg/estimator"
"github.com/influxdata/influxdb/v2/pkg/file"
"github.com/influxdata/influxdb/v2/pkg/limiter"
@ -35,10 +38,11 @@ import (
const (
measurementKey = "_name"
DefaultMetricInterval = 10 * time.Second
FieldsChangeFile = "fields.idxl"
bytesInInt64 = 8
)
var (
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors.New("field type conflict")
@ -870,14 +874,19 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
}
// add fields
changes := make([]*FieldChange, 0, len(fieldsToCreate))
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return err
}
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
return engine.MeasurementFieldSet().Save()
return engine.MeasurementFieldSet().Save(changes)
}
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
@ -1917,33 +1926,65 @@ func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataT
}
}
type FieldChanges []*FieldChange
func MeasurementsToFieldChangeDeletions(measurements []string) FieldChanges {
fcs := make([]*FieldChange, 0, len(measurements))
for _, m := range measurements {
fcs = append(fcs, &FieldChange{
FieldCreate: FieldCreate{
Measurement: []byte(m),
Field: nil,
},
ChangeType: DeleteMeasurement,
})
}
return fcs
}
// MeasurementFieldSet represents a collection of fields by measurement.
// This safe for concurrent use.
type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
// path is the location to persist field sets
path string
writer *MeasurementFieldSetWriter
path string
changeMgr *measurementFieldSetChangeMgr
}
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
func NewMeasurementFieldSet(path string, logger *zap.Logger) (*MeasurementFieldSet, error) {
const MaxCombinedWrites = 100
fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
path: path,
}
fs.SetMeasurementFieldSetWriter(MaxCombinedWrites)
if nil == logger {
logger = zap.NewNop()
}
fs.SetMeasurementFieldSetWriter(MaxCombinedWrites, logger)
// If there is a load error, return the error and an empty set so
// it can be rebuild manually.
return fs, fs.load()
}
func (fs *MeasurementFieldSet) Close() {
if fs != nil && fs.writer != nil {
fs.writer.Close()
func (fs *MeasurementFieldSet) Close() error {
if fs != nil && fs.changeMgr != nil {
fs.changeMgr.Close()
// If there is a change log file, save the in-memory version
if _, err := os.Stat(fs.changeMgr.changeFilePath); err == nil {
return fs.WriteToFile()
} else if os.IsNotExist(err) {
return nil
} else {
return fmt.Errorf("cannot get file information for %s: %w", fs.changeMgr.changeFilePath, err)
}
}
return nil
}
func (fs *MeasurementFieldSet) ChangesPath() string {
return fs.changeMgr.changeFilePath
}
// Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes.
@ -2014,7 +2055,7 @@ func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *Measurement
// Delete removes a field set for a measurement.
func (fs *MeasurementFieldSet) Delete(name string) {
fs.mu.Lock()
delete(fs.fields, name)
fs.deleteNoLock(name)
fs.mu.Unlock()
}
@ -2027,10 +2068,15 @@ func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) erro
return err
}
delete(fs.fields, name)
fs.deleteNoLock(name)
return nil
}
// deleteNoLock removes a field set for a measurement
func (fs *MeasurementFieldSet) deleteNoLock(name string) {
delete(fs.fields, name)
}
func (fs *MeasurementFieldSet) IsEmpty() bool {
fs.mu.RLock()
defer fs.mu.RUnlock()
@ -2040,154 +2086,247 @@ func (fs *MeasurementFieldSet) IsEmpty() bool {
type errorChannel chan<- error
type writeRequest struct {
done errorChannel
errorReturn chan<- error
changes FieldChanges
}
type MeasurementFieldSetWriter struct {
wg sync.WaitGroup
writeRequests chan writeRequest
type measurementFieldSetChangeMgr struct {
mu sync.Mutex
wg sync.WaitGroup
writeRequests chan writeRequest
changeFilePath string
logger *zap.Logger
changeFileSize int64
}
// SetMeasurementFieldSetWriter - initialize the queue for write requests
// and start the background write process
func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int) {
func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int, logger *zap.Logger) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.writer = &MeasurementFieldSetWriter{writeRequests: make(chan writeRequest, queueLength)}
fs.writer.wg.Add(1)
go fs.saveWriter()
fs.changeMgr = &measurementFieldSetChangeMgr{
writeRequests: make(chan writeRequest, queueLength),
changeFilePath: filepath.Join(filepath.Dir(fs.path), FieldsChangeFile),
logger: logger,
changeFileSize: int64(0),
}
fs.changeMgr.wg.Add(1)
go fs.changeMgr.SaveWriter()
}
func (w *MeasurementFieldSetWriter) Close() {
if w != nil {
close(w.writeRequests)
w.wg.Wait()
func (fscm *measurementFieldSetChangeMgr) Close() {
if fscm != nil {
close(fscm.writeRequests)
fscm.wg.Wait()
}
}
func (fs *MeasurementFieldSet) Save() error {
return fs.writer.RequestSave()
func (fs *MeasurementFieldSet) Save(changes FieldChanges) error {
return fs.changeMgr.RequestSave(changes)
}
func (w *MeasurementFieldSetWriter) RequestSave() error {
func (fscm *measurementFieldSetChangeMgr) RequestSave(changes FieldChanges) error {
done := make(chan error)
wr := writeRequest{done: done}
w.writeRequests <- wr
fscm.writeRequests <- writeRequest{errorReturn: done, changes: changes}
return <-done
}
func (fs *MeasurementFieldSet) saveWriter() {
defer fs.writer.wg.Done()
// Block until someone modifies the MeasurementFieldSet and
// it needs to be written to disk.
for req, ok := <-fs.writer.writeRequests; ok; req, ok = <-fs.writer.writeRequests {
fs.writeToFile(req)
func (fscm *measurementFieldSetChangeMgr) SaveWriter() {
defer fscm.wg.Done()
// Block until someone modifies the MeasurementFieldSet, and
// it needs to be written to disk. Exit when the channel is closed
for wr, ok := <-fscm.writeRequests; ok; wr, ok = <-fscm.writeRequests {
fscm.appendToChangesFile(wr)
}
}
// writeToFile: Write the new index to a temp file and rename when it's sync'd
func (fs *MeasurementFieldSet) writeToFile(first writeRequest) {
var err error
// Put the errorChannel on which we blocked into a slice to allow more invocations
// to share the return code from the file write
errorChannels := []errorChannel{first.done}
defer func() {
for _, c := range errorChannels {
c <- err
close(c)
}
}()
// Do some blocking IO operations before marshalling the in-memory index
// to allow other changes to it to be queued up and be captured in one
// write operation, in case we are under heavy field creation load
// WriteToFile: Write the new index to a temp file and rename when it's sync'd
// This locks the MeasurementFieldSet during the marshaling, the write, and the rename.
func (fs *MeasurementFieldSet) WriteToFile() error {
path := fs.path + ".tmp"
// Open the temp file
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return
return fmt.Errorf("failed opening %s: %w", fs.path, err)
}
// Ensure temp file is cleaned up
defer func() {
if e := os.RemoveAll(path); err == nil {
err = e
if e := os.RemoveAll(path); err == nil && e != nil {
err = fmt.Errorf("failed removing temporary file %s: %w", path, e)
}
if e := os.RemoveAll(fs.changeMgr.changeFilePath); err == nil && e != nil {
err = fmt.Errorf("failed removing saved field changes - %s: %w", fs.changeMgr.changeFilePath, e)
}
}()
fs.mu.RLock()
defer fs.mu.RUnlock()
isEmpty, err := func() (isEmpty bool, err error) {
// ensure temp file closed before rename (for Windows)
defer func() {
if e := fd.Close(); err == nil {
err = e
if e := fd.Close(); err == nil && e != nil {
err = fmt.Errorf("closing %s: %w", path, e)
}
}()
if _, err = fd.Write(fieldsIndexMagicNumber); err != nil {
return true, err
return true, fmt.Errorf("failed writing magic number for %s: %w", path, err)
}
// Read all the pending new field and measurement write requests
// that will be captured in the marshaling of the in-memory copy
for {
select {
case ec := <-fs.writer.writeRequests:
errorChannels = append(errorChannels, ec.done)
continue
default:
}
break
}
// Lock, copy, and marshal the in-memory index
b, err := fs.marshalMeasurementFieldSet()
b, err := fs.marshalMeasurementFieldSetNoLock()
if err != nil {
return true, err
return true, fmt.Errorf("failed marshaling fields for %s: %w", fs.path, err)
}
if b == nil {
// No fields, file removed, all done
return true, nil
}
if _, err := fd.Write(b); err != nil {
return true, err
return true, fmt.Errorf("failed saving fields to %s: %w", path, err)
}
return false, fd.Sync()
return false, nil
}()
if err != nil || isEmpty {
if err != nil {
return err
} else if isEmpty {
// remove empty file
if err = os.RemoveAll(fs.path); err != nil {
return fmt.Errorf("cannot remove %s: %w", fs.path, err)
} else {
return nil
}
}
return fs.renameFileNoLock(path)
}
// appendToChangesFile: Write a change file for fields.idx
// Only called in one Go proc, so does not need locking.
func (fscm *measurementFieldSetChangeMgr) appendToChangesFile(first writeRequest) {
var err error = nil
// Put the errorChannel on which we blocked into a slice to allow more invocations
// to share the return code from the file write
errorChannels := []errorChannel{first.errorReturn}
changes := []FieldChanges{first.changes}
// On return, send the error to every go proc that send changes
defer func() {
for _, c := range errorChannels {
c <- err
close(c)
}
}()
log, end := logger.NewOperation(context.TODO(), fscm.logger, "saving field index changes", "MeasurementFieldSet")
defer end()
// Do some blocking IO operations before marshalling the changes,
// to allow other changes to be queued up and be captured in one
// write operation, in case we are under heavy field creation load
fscm.mu.Lock()
defer fscm.mu.Unlock()
fd, err := os.OpenFile(fscm.changeFilePath, os.O_CREATE|os.O_APPEND|os.O_SYNC|os.O_WRONLY, 0666)
if err != nil {
err = fmt.Errorf("opening %s: %w", fscm.changeFilePath, err)
log.Error("failed", zap.Error(err))
return
}
err = fs.renameFile(path)
}
// marshalMeasurementFieldSet: remove the fields.idx file if no fields
// otherwise, copy the in-memory version into a protobuf to write to
// disk
func (fs *MeasurementFieldSet) marshalMeasurementFieldSet() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if len(fs.fields) == 0 {
// If no fields left, remove the fields index file
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
// ensure file closed
defer errors2.Capture(&err, func() error {
if e := fd.Close(); e != nil {
e = fmt.Errorf("closing %s: %w", fd.Name(), e)
log.Error("failed", zap.Error(e))
return e
} else {
return nil, nil
return nil
}
})()
var fi os.FileInfo
if fi, err = fd.Stat(); err != nil {
err = fmt.Errorf("unable to get size of %s: %w", fd.Name(), err)
log.Error("failed", zap.Error(err))
return
} else if fi.Size() > fscm.changeFileSize {
// If we had a partial write last time, truncate the file to remove it.
if err = fd.Truncate(fscm.changeFileSize); err != nil {
err = fmt.Errorf("cannot truncate %s to last known good size of %d after incomplete write: %w", fd.Name(), fscm.changeFileSize, err)
log.Error("failed", zap.Error(err))
return
}
}
return fs.marshalMeasurementFieldSetNoLock()
}
func (fs *MeasurementFieldSet) renameFile(path string) error {
fs.mu.Lock()
defer fs.mu.Unlock()
if err := file.RenameFile(path, fs.path); err != nil {
return err
// Read all the pending field and measurement write or delete
// requests
for {
select {
case wr := <-fscm.writeRequests:
changes = append(changes, wr.changes)
errorChannels = append(errorChannels, wr.errorReturn)
continue
default:
}
break
}
// marshal the slice of slices of field changes in size-prefixed protobuf
var b []byte
b, err = marshalFieldChanges(changes...)
if err != nil {
err = fmt.Errorf("error marshaling changes for %s: %w", fd.Name(), err)
log.Error("failed", zap.Error(err))
return
}
if err := file.SyncDir(filepath.Dir(fs.path)); err != nil {
return err
if _, err = fd.Write(b); err != nil {
err = fmt.Errorf("failed writing to %s: %w", fd.Name(), err)
log.Error("failed", zap.Error(err))
return
} else if fi, err = fd.Stat(); err != nil {
err = fmt.Errorf("unable to get final size of %s after appendation: %w", fd.Name(), err)
log.Error("failed", zap.Error(err))
return
} else {
fscm.changeFileSize = fi.Size()
}
}
func readSizePlusBuffer(r io.Reader, b []byte) ([]byte, error) {
var numBuf [bytesInInt64]byte
if _, err := r.Read(numBuf[:]); err != nil {
return nil, err
}
size := int(binary.LittleEndian.Uint64(numBuf[:]))
if cap(b) < size {
b = make([]byte, size)
}
_, err := io.ReadAtLeast(r, b, size)
if err != nil {
return nil, err
}
return b, nil
}
func (fs *MeasurementFieldSet) renameFileNoLock(path string) error {
if err := file.RenameFile(path, fs.path); err != nil {
return fmt.Errorf("cannot rename %s to %s: %w", path, fs.path, err)
}
dir := filepath.Dir(fs.path)
if err := file.SyncDir(dir); err != nil {
return fmt.Errorf("cannot sync directory %s: %w", dir, err)
}
return nil
}
// marshalMeasurementFieldSetNoLock: remove the fields.idx file if no fields
// otherwise, copy the in-memory version into a protobuf to write to
// disk
func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []byte, err error) {
if len(fs.fields) == 0 {
// If no fields left, remove the fields index file
return nil, nil
}
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
@ -2213,48 +2352,202 @@ func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []
}
}
func (fs *MeasurementFieldSet) load() error {
fs.mu.Lock()
defer fs.mu.Unlock()
func marshalFieldChanges(changeSet ...FieldChanges) ([]byte, error) {
fcs := internal.FieldChangeSet{
Changes: nil,
}
for _, fc := range changeSet {
for _, f := range fc {
mfc := &internal.MeasurementFieldChange{
Measurement: f.Measurement,
Change: internal.ChangeType(f.ChangeType),
}
if f.Field != nil {
mfc.Field = &internal.Field{
Name: []byte(f.Field.Name),
Type: int32(f.Field.Type),
}
fcs.Changes = append(fcs.Changes, mfc)
}
}
}
mo := proto.MarshalOptions{}
var numBuf [bytesInInt64]byte
b, err := mo.MarshalAppend(numBuf[:], &fcs)
binary.LittleEndian.PutUint64(b[0:bytesInInt64], uint64(len(b)-bytesInInt64))
if err != nil {
fields := make([]string, 0, len(fcs.Changes))
for _, fc := range changeSet {
for _, f := range fc {
fields = append(fields, fmt.Sprintf("%q.%q", f.Measurement, f.Field.Name))
}
}
return nil, fmt.Errorf("failed marshaling new fields - %s: %w", strings.Join(fields, ", "), err)
}
return b, nil
}
func (fs *MeasurementFieldSet) load() (rErr error) {
err := func() error {
fs.mu.Lock()
defer fs.mu.Unlock()
pb, err := fs.loadParseFieldIndexPB()
if err != nil {
return err
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
fields := make(map[string]*Field, len(measurement.GetFields()))
for _, field := range measurement.GetFields() {
fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())}
}
set := &MeasurementFields{}
set.fields.Store(fields)
fs.fields[string(measurement.GetName())] = set
}
return nil
}()
if err != nil {
return fmt.Errorf("failed loading field indices: %w", err)
}
return fs.ApplyChanges()
}
func (fs *MeasurementFieldSet) loadParseFieldIndexPB() (pb *internal.MeasurementFieldSet, rErr error) {
pb = &internal.MeasurementFieldSet{}
fd, err := os.Open(fs.path)
if os.IsNotExist(err) {
return nil
return pb, nil
} else if err != nil {
return err
err = fmt.Errorf("failed opening %s: %w", fs.path, err)
return nil, err
}
defer fd.Close()
defer errors2.Capture(&rErr, func() error {
if e := fd.Close(); e != nil {
return fmt.Errorf("failed closing %s: %w", fd.Name(), e)
} else {
return nil
}
})()
var magic [4]byte
if _, err := fd.Read(magic[:]); err != nil {
return err
err = fmt.Errorf("failed reading %s: %w", fs.path, err)
return nil, err
}
if !bytes.Equal(magic[:], fieldsIndexMagicNumber) {
return ErrUnknownFieldsFormat
return nil, fmt.Errorf("%q: %w", fs.path, ErrUnknownFieldsFormat)
}
var pb internal.MeasurementFieldSet
b, err := io.ReadAll(fd)
if err != nil {
err = fmt.Errorf("failed reading %s: %w", fs.path, err)
return nil, err
}
if err = proto.Unmarshal(b, pb); err != nil {
err = fmt.Errorf("failed unmarshaling %s: %w", fs.path, err)
return nil, err
}
return pb, err
}
func (fscm *measurementFieldSetChangeMgr) loadAllFieldChanges(log *zap.Logger) (changes []FieldChanges, rErr error) {
var fcs FieldChanges
fscm.mu.Lock()
defer fscm.mu.Unlock()
fd, err := os.Open(fscm.changeFilePath)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
err = fmt.Errorf("failed opening %s: %w", fscm.changeFilePath, err)
log.Error("field index file of changes", zap.Error(err))
return nil, err
}
defer errors2.Capture(&rErr, func() error {
if e := fd.Close(); e != nil {
return fmt.Errorf("failed closing %s: %w", fd.Name(), e)
} else {
return nil
}
})()
for fcs, err = fscm.loadFieldChangeSet(fd); err == nil; fcs, err = fscm.loadFieldChangeSet(fd) {
changes = append(changes, fcs)
}
if errors.Is(err, io.EOF) {
return changes, nil
} else if errors.Is(err, io.ErrUnexpectedEOF) {
log.Warn("last entry was an incomplete write", zap.Error(err))
return changes, nil
} else {
log.Error("field index file of changes", zap.Error(err))
return nil, err
}
}
func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (FieldChanges, error) {
var pb internal.FieldChangeSet
b, err := readSizePlusBuffer(r, nil)
if err != nil {
return nil, fmt.Errorf("failed reading %s: %w", fscm.changeFilePath, err)
}
if err := proto.Unmarshal(b, &pb); err != nil {
return nil, fmt.Errorf("failed unmarshalling %s: %w", fscm.changeFilePath, err)
}
fcs := make([]*FieldChange, 0, len(pb.Changes))
for _, fc := range pb.Changes {
fcs = append(fcs, &FieldChange{
FieldCreate: FieldCreate{
Measurement: fc.Measurement,
Field: &Field{
ID: 0,
Name: string(fc.Field.Name),
Type: influxql.DataType(fc.Field.Type),
},
},
ChangeType: ChangeType(fc.Change),
})
}
return fcs, nil
}
func (fs *MeasurementFieldSet) ApplyChanges() error {
log, end := logger.NewOperation(context.TODO(), fs.changeMgr.logger, "failed loading changes", "field indices")
defer end()
changes, err := fs.changeMgr.loadAllFieldChanges(log)
if err != nil {
return err
}
if err := proto.Unmarshal(b, &pb); err != nil {
return err
if len(changes) <= 0 {
return os.RemoveAll(fs.changeMgr.changeFilePath)
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
fields := make(map[string]*Field, len(measurement.GetFields()))
for _, field := range measurement.GetFields() {
fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())}
for _, fcs := range changes {
for _, fc := range fcs {
if fc.ChangeType == DeleteMeasurement {
fs.Delete(string(fc.Measurement))
} else {
mf := fs.CreateFieldsIfNotExists(fc.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err)
log.Error("field creation", zap.Error(err))
return err
}
}
}
set := &MeasurementFields{}
set.fields.Store(fields)
fs.fields[string(measurement.GetName())] = set
}
return nil
return fs.WriteToFile()
}
// Field represents a series field. All of the fields must be hashable.
@ -2264,6 +2557,18 @@ type Field struct {
Type influxql.DataType `json:"type,omitempty"`
}
type FieldChange struct {
FieldCreate
ChangeType ChangeType
}
type ChangeType int
const (
AddMeasurementField = ChangeType(internal.ChangeType_AddMeasurementField)
DeleteMeasurement = ChangeType(internal.ChangeType_DeleteMeasurement)
)
// NewFieldKeysIterator returns an iterator that can be iterated over to
// retrieve field keys.
func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {

View File

@ -3,7 +3,11 @@ package tsdb_test
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"math"
"os"
"path/filepath"
@ -16,18 +20,16 @@ import (
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/internal"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/deep"
"github.com/influxdata/influxdb/v2/pkg/testing/assert"
"github.com/influxdata/influxdb/v2/tsdb"
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
_ "github.com/influxdata/influxdb/v2/tsdb/index"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/assert"
assert2 "github.com/stretchr/testify/assert"
)
func TestShardWriteAndIndex(t *testing.T) {
@ -1518,31 +1520,48 @@ _reserved,region=uswest value="foo" 0
}
func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
const measurement = "cpu"
const fieldName = "value"
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
mf, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mf.Close()
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil {
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
mf2, err := tsdb.NewMeasurementFieldSet(path)
if err := mf.Save(tsdb.FieldChanges{&change}); err != nil {
t.Fatalf("save error: %v", err)
}
_, err = os.Stat(mf.ChangesPath())
assert.NoError(t, err, "no field.idx change file")
mf2, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mf2.Close()
fields = mf2.FieldsByString("cpu")
field := fields.Field("value")
_, err = os.Stat(mf.ChangesPath())
assert2.Error(t, err, "file %s should have had this error: %s", mf.ChangesPath(), os.ErrNotExist)
if !os.IsNotExist(err) {
t.Fatalf("unexpected error for %s: got %s, expected %s", mf.ChangesPath(), err, os.ErrNotExist)
}
defer checkMeasurementFieldSetClose(t, mf2)
fields = mf2.FieldsByString(measurement)
field := fields.Field(fieldName)
if field == nil {
t.Fatalf("field is null")
}
@ -1558,17 +1577,26 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) {
path := filepath.Join(dir, "fields.idx")
func() {
mf, err := tsdb.NewMeasurementFieldSet(path)
mf, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mf.Close()
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil {
defer checkMeasurementFieldSetClose(t, mf)
measurement := []byte("cpu")
fields := mf.CreateFieldsIfNotExists(measurement)
fieldName := "value"
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
if err := mf.Save(); err != nil {
if err := mf.Save(tsdb.FieldChanges{&change}); err != nil {
t.Fatalf("save error: %v", err)
}
}()
@ -1580,42 +1608,137 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) {
if err := os.Truncate(path, stat.Size()-3); err != nil {
t.Fatalf("truncate error: %v", err)
}
mf, err := tsdb.NewMeasurementFieldSet(path)
mf, err := tsdb.NewMeasurementFieldSet(path, nil)
if err == nil {
t.Fatal("NewMeasurementFieldSet expected error")
}
defer mf.Close()
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.FieldsByString("cpu")
if fields != nil {
t.Fatal("expecte fields to be nil")
}
}
func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
testFields := []struct {
Measurement string
Field string
FieldType influxql.DataType
}{
{
Measurement: "cpu",
Field: "value_1",
FieldType: influxql.Float,
},
{
Measurement: "cpu",
Field: "value_2",
FieldType: influxql.String,
},
{
Measurement: "cpu",
Field: "value_3",
FieldType: influxql.Integer,
},
}
path := filepath.Join(dir, "fields.idx")
var mf *tsdb.MeasurementFieldSet
var err error
mf, err = tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer checkMeasurementFieldSetClose(t, mf)
for _, f := range testFields {
fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement))
if err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(f.Measurement),
Field: &tsdb.Field{ID: 0, Name: f.Field, Type: f.FieldType},
},
ChangeType: tsdb.AddMeasurementField,
}
if err := mf.Save(tsdb.FieldChanges{&change}); err != nil {
t.Fatalf("save error: %v", err)
}
}
changeFile := filepath.Join(dir, tsdb.FieldsChangeFile)
stat, err := os.Stat(changeFile)
if err != nil {
t.Fatalf("stat error: %v", err)
}
// Truncate the file to simulate a corrupted file
if err := os.Truncate(changeFile, stat.Size()-3); err != nil {
t.Fatalf("truncate error: %v", err)
}
mf2, err := tsdb.NewMeasurementFieldSet(path, nil)
assert.NoError(t, err, "failed creating second MeasurementFieldSet")
defer checkMeasurementFieldSetClose(t, mf2)
for i := 0; i < len(testFields)-1; i++ {
fields := mf2.FieldsByString(testFields[i].Measurement)
if fields == nil {
t.Fatalf("nil fields map for %s", testFields[i].Measurement)
} else if f := fields.Field(testFields[i].Field); f == nil {
t.Fatalf("%s not found in %s fields", testFields[i].Field, testFields[i].Measurement)
} else if f.Type != testFields[i].FieldType {
t.Fatalf("%s.%s wrong type: expected %v, got %v", testFields[i].Measurement, testFields[i].Field, testFields[i].FieldType, f.Type)
}
}
i := len(testFields) - 1
fields := mf2.FieldsByString(testFields[i].Measurement)
if fields == nil {
t.Fatalf("nil fields map for %s", testFields[i].Measurement)
} else if f := fields.Field(testFields[i].Field); f != nil {
t.Fatalf("%s found in %s fields, should have not been present", testFields[i].Field, testFields[i].Measurement)
}
}
func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
const measurement = "cpu"
const fieldName = "value"
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
mf, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mf.Close()
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil {
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
if err := mf.Save(tsdb.FieldChanges{&change}); err != nil {
t.Fatalf("save error: %v", err)
}
mf2, err := tsdb.NewMeasurementFieldSet(path)
mf2, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mf2.Close()
fields = mf2.FieldsByString("cpu")
field := fields.Field("value")
fields = mf2.FieldsByString(measurement)
field := fields.Field(fieldName)
if field == nil {
t.Fatalf("field is null")
}
@ -1624,17 +1747,35 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
mf2.Delete("cpu")
mf2.Delete(measurement)
if err := mf2.Save(); err != nil {
if err := mf2.Save(tsdb.MeasurementsToFieldChangeDeletions([]string{measurement})); err != nil {
t.Fatalf("save after delete error: %v", err)
}
_, err = os.Stat(mf.ChangesPath())
assert.NoError(t, err, "no field.idx change file")
assert.NoError(t, mf2.Close(), "failed closing MeasurementFieldSet")
if _, err := os.Stat(path); !os.IsNotExist(err) {
_, err = os.Stat(mf.ChangesPath())
assert2.Error(t, err, "file %s should have had this error: %s", mf.ChangesPath(), os.ErrNotExist)
if !os.IsNotExist(err) {
t.Fatalf("unexpected error for %s: got %s, expected %s", mf.ChangesPath(), err, os.ErrNotExist)
}
if _, err = os.Stat(path); !os.IsNotExist(err) {
t.Fatalf("got %v, not exist err", err)
}
}
func checkMeasurementFieldSetClose(t *testing.T, fs *tsdb.MeasurementFieldSet) {
assert.NoError(t, fs.Close(), "failed closing tsdb.MeasurementFieldSet")
_, err := os.Stat(fs.ChangesPath())
assert2.Error(t, err, "file %s should have had this error: %s", fs.ChangesPath(), os.ErrNotExist)
if !os.IsNotExist(err) {
t.Fatalf("unexpected error for %s: got %s, expected %s", fs.ChangesPath(), err, os.ErrNotExist)
}
}
func TestMeasurementFieldSet_InvalidFormat(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
@ -1645,11 +1786,11 @@ func TestMeasurementFieldSet_InvalidFormat(t *testing.T) {
t.Fatalf("error writing fields.index: %v", err)
}
mf, err := tsdb.NewMeasurementFieldSet(path)
if err != tsdb.ErrUnknownFieldsFormat {
mf, err := tsdb.NewMeasurementFieldSet(path, nil)
if !errors.Is(err, tsdb.ErrUnknownFieldsFormat) {
t.Fatalf("unexpected error: got %v, exp %v", err, tsdb.ErrUnknownFieldsFormat)
}
defer mf.Close()
defer checkMeasurementFieldSetClose(t, mf)
}
func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
@ -1673,11 +1814,11 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
}
path := filepath.Join(dir, "fields.idx")
mfs, err := tsdb.NewMeasurementFieldSet(path)
mfs, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mfs.Close()
defer checkMeasurementFieldSetClose(t, mfs)
var wg sync.WaitGroup
wg.Add(len(ft))
@ -1686,11 +1827,11 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
}
wg.Wait()
mfs2, err := tsdb.NewMeasurementFieldSet(path)
mfs2, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
defer mfs2.Close()
defer checkMeasurementFieldSetClose(t, mfs2)
for i, fs := range ft {
mf := mfs.Fields([]byte(mt[i]))
mf2 := mfs2.Fields([]byte(mt[i]))
@ -1708,7 +1849,7 @@ func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
func TestMeasurementFieldSet_MeasurementNames(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
mf, err := tsdb.NewMeasurementFieldSet(path, nil)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
@ -1726,15 +1867,31 @@ func TestMeasurementFieldSet_MeasurementNames(t *testing.T) {
func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldSet, measurement string, fieldNames []string) {
defer wg.Done()
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
for _, fieldName := range fieldNames {
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Errorf("create field error: %v", err)
return
}
if err := mf.Save(); err != nil {
t.Errorf("save error: %v", err)
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
err := mf.Save(tsdb.FieldChanges{&change})
if err != nil {
t.Logf("save error: %v", err)
t.Fail()
return
}
_, err = os.Stat(mf.ChangesPath())
if err != nil {
t.Logf("unexpected error for field.idxl change file %s: %s", mf.ChangesPath(), err)
t.Fail()
}
}
}