Merge pull request #9323 from influxdata/jw-fields-corrupt

Rebuild corrupted fields index when necessary
pull/9326/head
Jason Wilder 2018-01-16 13:35:49 -07:00 committed by GitHub
commit f7e554ed52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 3 deletions

View File

@ -635,7 +635,7 @@ func (e *Engine) Open() error {
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"))
if err != nil {
return err
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
}
e.mu.Lock()

View File

@ -1478,8 +1478,11 @@ func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
fields: make(map[string]*MeasurementFields),
path: path,
}
// If there is a load error, return the error and an empty set so
// it can be rebuild manually.
if err := fs.load(); err != nil {
return nil, err
return fs, err
}
return fs, nil
}
@ -1598,7 +1601,11 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
return err
}
return file.RenameFile(path, fs.path)
if err := file.RenameFile(path, fs.path); err != nil {
return err
}
return file.SyncDir(filepath.Dir(fs.path))
}
func (fs *MeasurementFieldSet) load() error {

View File

@ -102,6 +102,57 @@ func TestShardWriteAndIndex(t *testing.T) {
}
}
func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
defer sfile.Close()
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir), sfile.SeriesFile)
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
if got, exp := sh.WritePoints(nil), tsdb.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
pt := models.MustNewPoint(
"cpu",
models.Tags{{Key: []byte("host"), Value: []byte("server")}},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
}
if err := sh.Close(); err != nil {
t.Fatalf("close shard error: %v", err)
}
path := filepath.Join(tmpShard, "fields.idx")
if err := os.Truncate(path, 6); err != nil {
t.Fatalf("truncate shard error: %v", err)
}
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
}
func TestMaxSeriesLimit(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
@ -1530,6 +1581,45 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
}
}
func TestMeasurementFieldSet_Corrupt(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
}
stat, err := os.Stat(path)
if err != nil {
t.Fatalf("stat error: %v", err)
}
// Truncate the file to simulate a a corrupted file
if err := os.Truncate(path, stat.Size()-3); err != nil {
t.Fatalf("truncate error: %v", err)
}
mf, err = tsdb.NewMeasurementFieldSet(path)
if err == nil {
t.Fatal("NewMeasurementFieldSet expected error")
}
fields = mf.Fields("cpu")
if fields != nil {
t.Fatal("expecte fields to be nil")
}
}
func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()