Save field types to speed up startup

This persists the field types in a shard to avoid having to scan
all the TSM files at startup.
pull/9147/head
Jason Wilder 2017-11-21 16:52:58 -07:00
parent c8b24b7939
commit c14b0e81b7
8 changed files with 368 additions and 67 deletions

20
pkg/file/file_unix.go Normal file
View File

@ -0,0 +1,20 @@
// +build !windows
package file
import "os"
func SyncDir(dirName string) error {
// fsync the dir to flush the rename
dir, err := os.OpenFile(dirName, os.O_RDONLY, os.ModeDir)
if err != nil {
return err
}
defer dir.Close()
return dir.Sync()
}
// RenameFile will rename the source to target using os function.
func RenameFile(oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}

18
pkg/file/file_windows.go Normal file
View File

@ -0,0 +1,18 @@
package file
import "os"
func SyncDir(dirName string) error {
return nil
}
// RenameFile will rename the source to target using os function. If target exists it will be removed before renaming.
func RenameFile(oldpath, newpath string) error {
if _, err := os.Stat(newpath); err == nil {
if err = os.Remove(newpath); nil != err {
return err
}
}
return os.Rename(oldpath, newpath)
}

View File

@ -64,6 +64,7 @@ type Engine interface {
MeasurementFields(measurement []byte) *MeasurementFields MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error DeleteMeasurement(name []byte) error
MeasurementFieldSet() *MeasurementFieldSet
// TagKeys(name []byte) ([][]byte, error) // TagKeys(name []byte) ([][]byte, error)
HasTagKey(name, key []byte) (bool, error) HasTagKey(name, key []byte) (bool, error)

View File

@ -189,8 +189,6 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
traceLogger: logger, traceLogger: logger,
traceLogging: opt.Config.TraceLoggingEnabled, traceLogging: opt.Config.TraceLoggingEnabled,
fieldset: tsdb.NewMeasurementFieldSet(),
WAL: w, WAL: w,
Cache: cache, Cache: cache,
@ -206,9 +204,6 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()), scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
} }
// Attach fieldset to index.
e.index.SetFieldSet(e.fieldset)
if e.traceLogging { if e.traceLogging {
fs.enableTraceLogging(true) fs.enableTraceLogging(true)
w.enableTraceLogging(true) w.enableTraceLogging(true)
@ -429,6 +424,10 @@ func (e *Engine) MeasurementFields(measurement []byte) *tsdb.MeasurementFields {
return e.fieldset.CreateFieldsIfNotExists(measurement) return e.fieldset.CreateFieldsIfNotExists(measurement)
} }
func (e *Engine) MeasurementFieldSet() *tsdb.MeasurementFieldSet {
return e.fieldset
}
func (e *Engine) HasTagKey(name, key []byte) (bool, error) { func (e *Engine) HasTagKey(name, key []byte) (bool, error) {
return e.index.HasTagKey(name, key) return e.index.HasTagKey(name, key)
} }
@ -574,6 +573,17 @@ func (e *Engine) Open() error {
return err return err
} }
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"))
if err != nil {
return err
}
e.mu.Lock()
e.fieldset = fields
e.mu.Unlock()
e.index.SetFieldSet(fields)
if err := e.WAL.Open(); err != nil { if err := e.WAL.Open(); err != nil {
return err return err
} }
@ -629,6 +639,12 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
// Save reference to index for iterator creation. // Save reference to index for iterator creation.
e.index = index e.index = index
// If we have the cached fields index on disk and and we're using TSI, we
// can skip scanning all the TSM files.
if e.index.Type() != inmem.IndexName && !e.fieldset.IsEmpty() {
return nil
}
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error { if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil { if err != nil {
@ -658,6 +674,11 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return err return err
} }
// Save the field set index so we don't have to rebuild it next time
if err := e.fieldset.Save(); err != nil {
return err
}
e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now))) e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now)))
return nil return nil
} }
@ -1275,7 +1296,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
return err return err
} }
return nil return e.fieldset.Save()
} }
// DeleteMeasurement deletes a measurement and all related series. // DeleteMeasurement deletes a measurement and all related series.

View File

@ -1,9 +1,8 @@
// Code generated by protoc-gen-gogo. // Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: internal/meta.proto // source: internal/meta.proto
// DO NOT EDIT!
/* /*
Package meta is a generated protocol buffer package. Package tsdb is a generated protocol buffer package.
It is generated from these files: It is generated from these files:
internal/meta.proto internal/meta.proto
@ -13,8 +12,9 @@ It has these top-level messages:
Tag Tag
MeasurementFields MeasurementFields
Field Field
MeasurementFieldSet
*/ */
package meta package tsdb
import proto "github.com/gogo/protobuf/proto" import proto "github.com/gogo/protobuf/proto"
import fmt "fmt" import fmt "fmt"
@ -32,9 +32,8 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Series struct { type Series struct {
Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"` Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"` Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"`
XXX_unrecognized []byte `json:"-"`
} }
func (m *Series) Reset() { *m = Series{} } func (m *Series) Reset() { *m = Series{} }
@ -43,8 +42,8 @@ func (*Series) ProtoMessage() {}
func (*Series) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{0} } func (*Series) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{0} }
func (m *Series) GetKey() string { func (m *Series) GetKey() string {
if m != nil && m.Key != nil { if m != nil {
return *m.Key return m.Key
} }
return "" return ""
} }
@ -57,9 +56,8 @@ func (m *Series) GetTags() []*Tag {
} }
type Tag struct { type Tag struct {
Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"` Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
Value *string `protobuf:"bytes,2,req,name=Value" json:"Value,omitempty"` Value string `protobuf:"bytes,2,opt,name=Value,proto3" json:"Value,omitempty"`
XXX_unrecognized []byte `json:"-"`
} }
func (m *Tag) Reset() { *m = Tag{} } func (m *Tag) Reset() { *m = Tag{} }
@ -68,22 +66,22 @@ func (*Tag) ProtoMessage() {}
func (*Tag) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{1} } func (*Tag) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{1} }
func (m *Tag) GetKey() string { func (m *Tag) GetKey() string {
if m != nil && m.Key != nil { if m != nil {
return *m.Key return m.Key
} }
return "" return ""
} }
func (m *Tag) GetValue() string { func (m *Tag) GetValue() string {
if m != nil && m.Value != nil { if m != nil {
return *m.Value return m.Value
} }
return "" return ""
} }
type MeasurementFields struct { type MeasurementFields struct {
Fields []*Field `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"` Name string `protobuf:"bytes,1,opt,name=Name,proto3" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"` Fields []*Field `protobuf:"bytes,2,rep,name=Fields" json:"Fields,omitempty"`
} }
func (m *MeasurementFields) Reset() { *m = MeasurementFields{} } func (m *MeasurementFields) Reset() { *m = MeasurementFields{} }
@ -91,6 +89,13 @@ func (m *MeasurementFields) String() string { return proto.CompactTex
func (*MeasurementFields) ProtoMessage() {} func (*MeasurementFields) ProtoMessage() {}
func (*MeasurementFields) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{2} } func (*MeasurementFields) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{2} }
func (m *MeasurementFields) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *MeasurementFields) GetFields() []*Field { func (m *MeasurementFields) GetFields() []*Field {
if m != nil { if m != nil {
return m.Fields return m.Fields
@ -99,10 +104,9 @@ func (m *MeasurementFields) GetFields() []*Field {
} }
type Field struct { type Field struct {
ID *int32 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` ID int32 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"`
Type *int32 `protobuf:"varint,3,req,name=Type" json:"Type,omitempty"` Type int32 `protobuf:"varint,3,opt,name=Type,proto3" json:"Type,omitempty"`
XXX_unrecognized []byte `json:"-"`
} }
func (m *Field) Reset() { *m = Field{} } func (m *Field) Reset() { *m = Field{} }
@ -111,47 +115,68 @@ func (*Field) ProtoMessage() {}
func (*Field) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{3} } func (*Field) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{3} }
func (m *Field) GetID() int32 { func (m *Field) GetID() int32 {
if m != nil && m.ID != nil { if m != nil {
return *m.ID return m.ID
} }
return 0 return 0
} }
func (m *Field) GetName() string { func (m *Field) GetName() string {
if m != nil && m.Name != nil { if m != nil {
return *m.Name return m.Name
} }
return "" return ""
} }
func (m *Field) GetType() int32 { func (m *Field) GetType() int32 {
if m != nil && m.Type != nil { if m != nil {
return *m.Type return m.Type
} }
return 0 return 0
} }
type MeasurementFieldSet struct {
Measurements []*MeasurementFields `protobuf:"bytes,1,rep,name=Measurements" json:"Measurements,omitempty"`
}
func (m *MeasurementFieldSet) Reset() { *m = MeasurementFieldSet{} }
func (m *MeasurementFieldSet) String() string { return proto.CompactTextString(m) }
func (*MeasurementFieldSet) ProtoMessage() {}
func (*MeasurementFieldSet) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{4} }
func (m *MeasurementFieldSet) GetMeasurements() []*MeasurementFields {
if m != nil {
return m.Measurements
}
return nil
}
func init() { func init() {
proto.RegisterType((*Series)(nil), "meta.Series") proto.RegisterType((*Series)(nil), "tsdb.Series")
proto.RegisterType((*Tag)(nil), "meta.Tag") proto.RegisterType((*Tag)(nil), "tsdb.Tag")
proto.RegisterType((*MeasurementFields)(nil), "meta.MeasurementFields") proto.RegisterType((*MeasurementFields)(nil), "tsdb.MeasurementFields")
proto.RegisterType((*Field)(nil), "meta.Field") proto.RegisterType((*Field)(nil), "tsdb.Field")
proto.RegisterType((*MeasurementFieldSet)(nil), "tsdb.MeasurementFieldSet")
} }
func init() { proto.RegisterFile("internal/meta.proto", fileDescriptorMeta) } func init() { proto.RegisterFile("internal/meta.proto", fileDescriptorMeta) }
var fileDescriptorMeta = []byte{ var fileDescriptorMeta = []byte{
// 180 bytes of a gzipped FileDescriptorProto // 242 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x54, 0x8c, 0xbd, 0xca, 0xc2, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xbd, 0x4b, 0x03, 0x41,
0x14, 0x40, 0x69, 0xd2, 0x16, 0x7a, 0xfb, 0x7d, 0x83, 0x71, 0x30, 0xe0, 0x52, 0x33, 0x75, 0x6a, 0x10, 0xc5, 0xb9, 0xbd, 0x0f, 0xc8, 0x44, 0x44, 0x27, 0x82, 0xdb, 0x08, 0x61, 0x6d, 0xd2, 0x78,
0xc5, 0x67, 0x10, 0x41, 0x44, 0x17, 0x83, 0xfb, 0x05, 0x2f, 0xa5, 0xd0, 0x3f, 0x92, 0x74, 0xe8, 0x82, 0x56, 0x62, 0x61, 0x13, 0x84, 0xe0, 0x47, 0xb1, 0x39, 0xec, 0x27, 0x64, 0x38, 0x0e, 0xee,
0xdb, 0x4b, 0x52, 0x17, 0xb7, 0x73, 0xee, 0xcf, 0x81, 0x6d, 0x3b, 0x38, 0x32, 0x03, 0x76, 0x75, 0x2e, 0x61, 0x77, 0x53, 0xe4, 0xbf, 0x97, 0xcc, 0x1e, 0x12, 0x35, 0xdd, 0xdb, 0x37, 0xf3, 0xe6,
0x4f, 0x0e, 0xab, 0xc9, 0x8c, 0x6e, 0x14, 0xb1, 0x67, 0x55, 0x41, 0xfa, 0x24, 0xd3, 0x92, 0x15, 0xfd, 0x58, 0x98, 0x34, 0x7d, 0x60, 0xd7, 0x53, 0x7b, 0xdf, 0x71, 0xa0, 0x72, 0xeb, 0x36, 0x61,
0x39, 0xf0, 0x1b, 0x2d, 0x32, 0x2a, 0x58, 0x99, 0x89, 0x1d, 0xc4, 0x1a, 0x1b, 0x2b, 0x59, 0xc1, 0x83, 0x59, 0xf0, 0xeb, 0x95, 0x79, 0x82, 0x62, 0xc9, 0xae, 0x61, 0x8f, 0x17, 0x90, 0xbe, 0xf1,
0xcb, 0xfc, 0x94, 0x55, 0xe1, 0x4f, 0x63, 0xa3, 0x0e, 0xc0, 0x35, 0x36, 0xbf, 0xc7, 0xff, 0x90, 0x5e, 0x27, 0xd3, 0x64, 0x36, 0xb2, 0x07, 0x89, 0x37, 0x90, 0x55, 0x54, 0x7b, 0xad, 0xa6, 0xe9,
0xbc, 0xb0, 0x9b, 0x49, 0x32, 0xaf, 0xea, 0x08, 0x9b, 0x3b, 0xa1, 0x9d, 0x0d, 0xf5, 0x34, 0xb8, 0x6c, 0xfc, 0x30, 0x2a, 0x0f, 0x81, 0xb2, 0xa2, 0xda, 0x8a, 0x6d, 0xee, 0x20, 0xad, 0xa8, 0x3e,
0x4b, 0x4b, 0xdd, 0xdb, 0x8a, 0x3d, 0xa4, 0x2b, 0xc9, 0x28, 0x24, 0xf3, 0x35, 0x19, 0x66, 0xaa, 0x91, 0xbb, 0x82, 0xfc, 0x8b, 0xda, 0x1d, 0x6b, 0x25, 0x5e, 0x7c, 0x98, 0x77, 0xb8, 0xfc, 0x60,
0x86, 0x24, 0x80, 0x00, 0x60, 0xd7, 0x73, 0xa8, 0x26, 0xe2, 0x0f, 0xe2, 0x07, 0xf6, 0xdf, 0xa8, 0xf2, 0x3b, 0xc7, 0x1d, 0xf7, 0xe1, 0xb5, 0xe1, 0x76, 0xed, 0x11, 0x21, 0xfb, 0xa4, 0x8e, 0x87,
0x37, 0xbd, 0x4c, 0x24, 0xb9, 0xdf, 0x7d, 0x02, 0x00, 0x00, 0xff, 0xff, 0x04, 0x3d, 0x58, 0x4a, 0xb4, 0x68, 0xbc, 0x85, 0x22, 0x4e, 0x87, 0xe2, 0x71, 0x2c, 0x16, 0xcf, 0x0e, 0x23, 0xf3, 0x02,
0xd1, 0x00, 0x00, 0x00, 0xb9, 0x28, 0x3c, 0x07, 0xb5, 0x98, 0x4b, 0x3e, 0xb7, 0x6a, 0x31, 0xff, 0xb9, 0xa8, 0x8e, 0x2e,
0x22, 0x64, 0xd5, 0x7e, 0xcb, 0x3a, 0x95, 0x2d, 0xd1, 0xc6, 0xc2, 0xe4, 0x2f, 0xce, 0x92, 0x03,
0x3e, 0xc3, 0xd9, 0x91, 0xed, 0x75, 0x22, 0x08, 0xd7, 0x11, 0xe1, 0x1f, 0xbf, 0xfd, 0xb5, 0xbc,
0x2a, 0xe4, 0x67, 0x1f, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xac, 0xee, 0x08, 0x52, 0x70, 0x01,
0x00, 0x00,
} }

View File

@ -1,4 +1,6 @@
package meta; syntax = "proto3";
package tsdb;
//======================================================================== //========================================================================
// //
@ -7,21 +9,25 @@ package meta;
//======================================================================== //========================================================================
message Series { message Series {
required string Key = 1; string Key = 1;
repeated Tag Tags = 2; repeated Tag Tags = 2;
} }
message Tag { message Tag {
required string Key = 1; string Key = 1;
required string Value = 2; string Value = 2;
} }
message MeasurementFields { message MeasurementFields {
repeated Field Fields = 1; string Name = 1;
repeated Field Fields = 2;
} }
message Field { message Field {
required int32 ID = 1; string Name = 1;
required string Name = 2; int32 Type = 2;
required int32 Type = 3; }
}
message MeasurementFieldSet {
repeated MeasurementFields Measurements = 1;
}

View File

@ -6,6 +6,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"runtime" "runtime"
@ -18,6 +20,7 @@ import (
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/file"
"github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/query"
internal "github.com/influxdata/influxdb/tsdb/internal" internal "github.com/influxdata/influxdb/tsdb/internal"
@ -691,6 +694,10 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
s.index.SetFieldName(f.Measurement, f.Field.Name) s.index.SetFieldName(f.Measurement, f.Field.Name)
} }
if len(fieldsToCreate) > 0 {
return engine.MeasurementFieldSet().Save()
}
return nil return nil
} }
@ -1399,10 +1406,7 @@ func (m *MeasurementFields) MarshalBinary() ([]byte, error) {
var pb internal.MeasurementFields var pb internal.MeasurementFields
for _, f := range m.fields { for _, f := range m.fields {
id := int32(f.ID) pb.Fields = append(pb.Fields, &internal.Field{Name: f.Name, Type: int32(f.Type)})
name := f.Name
t := int32(f.Type)
pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t})
} }
return proto.Marshal(&pb) return proto.Marshal(&pb)
} }
@ -1418,7 +1422,7 @@ func (m *MeasurementFields) UnmarshalBinary(buf []byte) error {
} }
m.fields = make(map[string]*Field, len(pb.Fields)) m.fields = make(map[string]*Field, len(pb.Fields))
for _, f := range pb.Fields { for _, f := range pb.Fields {
m.fields[f.GetName()] = &Field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())} m.fields[f.GetName()] = &Field{Name: f.GetName(), Type: influxql.DataType(f.GetType())}
} }
return nil return nil
} }
@ -1507,6 +1511,16 @@ func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
return fields return fields
} }
func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for name, f := range m.fields {
if !fn(name, f.Type) {
return
}
}
}
// Clone returns copy of the MeasurementFields // Clone returns copy of the MeasurementFields
func (m *MeasurementFields) Clone() *MeasurementFields { func (m *MeasurementFields) Clone() *MeasurementFields {
m.mu.RLock() m.mu.RLock()
@ -1525,13 +1539,21 @@ func (m *MeasurementFields) Clone() *MeasurementFields {
type MeasurementFieldSet struct { type MeasurementFieldSet struct {
mu sync.RWMutex mu sync.RWMutex
fields map[string]*MeasurementFields fields map[string]*MeasurementFields
// path is the location to persist field sets
path string
} }
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. // NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet() *MeasurementFieldSet { func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
return &MeasurementFieldSet{ fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields), fields: make(map[string]*MeasurementFields),
path: path,
} }
if err := fs.load(); err != nil {
return nil, err
}
return fs, nil
} }
// Fields returns fields for a measurement by name. // Fields returns fields for a measurement by name.
@ -1582,6 +1604,106 @@ func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) erro
return nil return nil
} }
func (fs *MeasurementFieldSet) IsEmpty() bool {
fs.mu.RLock()
defer fs.mu.RUnlock()
return len(fs.fields) == 0
}
func (fs *MeasurementFieldSet) Save() error {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.saveNoLock()
}
func (fs *MeasurementFieldSet) saveNoLock() error {
// No fields left, remove the fields index file
if len(fs.fields) == 0 {
return os.RemoveAll(fs.path)
}
// Write the new index to a temp file and rename when it's sync'd
path := fs.path + ".tmp"
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
defer os.RemoveAll(path)
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
for name, mf := range fs.fields {
fs := &internal.MeasurementFields{
Name: name,
Fields: make([]*internal.Field, 0, mf.FieldN()),
}
mf.ForEachField(func(field string, typ influxql.DataType) bool {
fs.Fields = append(fs.Fields, &internal.Field{Name: field, Type: int32(typ)})
return true
})
pb.Measurements = append(pb.Measurements, fs)
}
b, err := proto.Marshal(&pb)
if err != nil {
return err
}
if _, err := fd.Write(b); err != nil {
return err
}
if err = fd.Sync(); err != nil {
return err
}
//close file handle before renaming to support Windows
if err = fd.Close(); err != nil {
return err
}
return file.RenameFile(path, fs.path)
}
func (fs *MeasurementFieldSet) load() error {
fs.mu.Lock()
defer fs.mu.Unlock()
fd, err := os.Open(fs.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer fd.Close()
var pb internal.MeasurementFieldSet
b, err := ioutil.ReadAll(fd)
if err != nil {
return err
}
if err := proto.Unmarshal(b, &pb); err != nil {
return err
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
set := &MeasurementFields{
fields: make(map[string]*Field, len(measurement.GetFields())),
}
for _, field := range measurement.GetFields() {
set.fields[field.GetName()] = &Field{Name: field.GetName(), Type: influxql.DataType(field.GetType())}
}
fs.fields[measurement.GetName()] = set
}
return nil
}
// Field represents a series field. // Field represents a series field.
type Field struct { type Field struct {
ID uint8 `json:"id,omitempty"` ID uint8 `json:"id,omitempty"`

View File

@ -1402,6 +1402,86 @@ _reserved,region=uswest value="foo" 0
} }
} }
func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float, true); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
}
mf, err = tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields = mf.Fields("cpu")
field := fields.Field("value")
if field == nil {
t.Fatalf("field is null")
}
if got, exp := field.Type, influxql.Float; got != exp {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
}
func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float, true); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
}
mf, err = tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields = mf.Fields("cpu")
field := fields.Field("value")
if field == nil {
t.Fatalf("field is null")
}
if got, exp := field.Type, influxql.Float; got != exp {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
mf.Delete("cpu")
if err := mf.Save(); err != nil {
t.Fatalf("save after delete error: %v", err)
}
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Fatalf("got %v, not exist err", err)
}
}
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) } func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) } func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) } func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }
@ -1693,3 +1773,11 @@ func (sh *Shard) MustWritePointsString(s string) {
panic(err) panic(err)
} }
} }
func MustTempDir() (string, func()) {
dir, err := ioutil.TempDir("", "shard-test")
if err != nil {
panic(fmt.Sprintf("failed to create temp dir: %v", err))
}
return dir, func() { os.RemoveAll(dir) }
}