From 619eb1cae68689854b4e1489cd123c7490147458 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Mon, 25 Jul 2022 10:53:09 -0700 Subject: [PATCH] fix: restore in-memory Manifest on write error (#23552) (#23578) Do not update the `FileSet` or `activeLogFile` field in the in-memory Partition structure if the Manifest file is not correctly saved to the disk. closes https://github.com/influxdata/influxdb/issues/23553 (cherry picked from commit a8732dcf524053c972fcfabb38f528e7ef3eba5a) closes https://github.com/influxdata/influxdb/issues/23554 --- tsdb/index/tsi1/index.go | 24 +++--- tsdb/index/tsi1/partition.go | 118 ++++++++++++++++++++++-------- tsdb/index/tsi1/partition_test.go | 65 ++++++++++++++++ 3 files changed, 166 insertions(+), 41 deletions(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 82552ebe4e..09bca94299 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -309,13 +309,7 @@ func (i *Index) Open() (rErr error) { func (i *Index) cleanUpFail(err *error) { if nil != *err { - for _, p := range i.partitions { - if (p != nil) && p.IsOpen() { - if e := p.Close(); e != nil { - i.logger.Warn("Failed to clean up partition") - } - } - } + i.close() } } @@ -352,16 +346,24 @@ func (i *Index) Close() error { // Lock index and close partitions. i.mu.Lock() defer i.mu.Unlock() + return i.close() +} +// close closes the index without locking +func (i *Index) close() (rErr error) { for _, p := range i.partitions { - if err := p.Close(); err != nil { - return err + if (p != nil) && p.IsOpen() { + if pErr := p.Close(); pErr != nil { + i.logger.Warn("Failed to clean up partition", zap.String("path", p.Path())) + if rErr == nil { + rErr = pErr + } + } } } - // Mark index as closed. i.opened = false - return nil + return rErr } // Path returns the path the index was opened with. diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 394548976b..687c0be8d8 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -86,11 +86,13 @@ type Partition struct { // Index's version. version int + + manifestPathFn func() string } // NewPartition returns a new instance of Partition. func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { - return &Partition{ + p := &Partition{ closing: make(chan struct{}), path: path, sfile: sfile, @@ -104,6 +106,8 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { logger: zap.NewNop(), version: Version, } + p.manifestPathFn = p.manifestPath + return p } // bytes estimates the memory footprint of this Partition, in bytes. @@ -408,27 +412,44 @@ func (p *Partition) nextSequence() int { return p.seq } -// ManifestPath returns the path to the index's manifest file. func (p *Partition) ManifestPath() string { + return p.manifestPathFn() +} + +// ManifestPath returns the path to the index's manifest file. +func (p *Partition) manifestPath() string { return filepath.Join(p.path, ManifestFileName) } // Manifest returns a manifest for the index. func (p *Partition) Manifest() *Manifest { + return p.manifest(p.fileSet) +} + +// manifest returns a manifest for the index, possibly using a +// new FileSet to account for compaction or log prepending +func (p *Partition) manifest(newFileSet *FileSet) *Manifest { m := &Manifest{ Levels: p.levels, - Files: make([]string, len(p.fileSet.files)), + Files: make([]string, len(newFileSet.files)), Version: p.version, path: p.ManifestPath(), } - for j, f := range p.fileSet.files { + for j, f := range newFileSet.files { m.Files[j] = filepath.Base(f.Path()) } return m } +// SetManifestPathForTest is only to force a bad path in testing +func (p *Partition) SetManifestPathForTest(path string) { + p.mu.Lock() + defer p.mu.Unlock() + p.manifestPathFn = func() string { return path } +} + // WithLogger sets the logger for the index. func (p *Partition) WithLogger(logger *zap.Logger) { p.logger = logger.With(zap.String("index", "tsi")) @@ -468,28 +489,42 @@ func (p *Partition) retainFileSet() *FileSet { } // FileN returns the active files in the file set. -func (p *Partition) FileN() int { return len(p.fileSet.files) } +func (p *Partition) FileN() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.fileSet.files) +} // prependActiveLogFile adds a new log file so that the current log file can be compacted. -func (p *Partition) prependActiveLogFile() error { +func (p *Partition) prependActiveLogFile() (rErr error) { // Open file and insert it into the first position. f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence()))) if err != nil { return err } - p.activeLogFile = f + var oldActiveFile *LogFile + p.activeLogFile, oldActiveFile = f, p.activeLogFile - // Prepend and generate new fileset. - p.fileSet = p.fileSet.PrependLogFile(f) + // Prepend and generate new fileset but do not yet update the partition + newFileSet := p.fileSet.PrependLogFile(f) + + errors2.Capture(&rErr, func() error { + if rErr != nil { + // close the new file. + f.Close() + p.activeLogFile = oldActiveFile + } + return rErr + })() // Write new manifest. - manifestSize, err := p.Manifest().Write() + manifestSize, err := p.manifest(newFileSet).Write() if err != nil { - // TODO: Close index if write fails. - p.logger.Error("manifest write failed, index is potentially damaged", zap.Error(err)) - return err + return fmt.Errorf("manifest write failed for %q: %w", p.ManifestPath(), err) } p.manifestSize = manifestSize + // Store the new FileSet in the partition now that the manifest has been written + p.fileSet = newFileSet return nil } @@ -1113,20 +1148,28 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch } // Obtain lock to swap in index file and write manifest. - if err := func() error { + if err := func() (rErr error) { p.mu.Lock() defer p.mu.Unlock() // Replace previous files with new index file. - p.fileSet = p.fileSet.MustReplace(IndexFiles(files).Files(), file) + newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file) // Write new manifest. - manifestSize, err := p.Manifest().Write() + manifestSize, err := p.manifest(newFileSet).Write() + defer errors2.Capture(&rErr, func() error { + if rErr != nil { + // Close the new file to avoid leaks. + file.Close() + } + return rErr + })() if err != nil { - // TODO: Close index if write fails. - return err + return fmt.Errorf("manifest file write failed compacting index %q: %w", p.ManifestPath(), err) } p.manifestSize = manifestSize + // Store the new FileSet in the partition now that the manifest has been written + p.fileSet = newFileSet return nil }(); err != nil { log.Error("Cannot write manifest", zap.Error(err)) @@ -1262,20 +1305,29 @@ func (p *Partition) compactLogFile(logFile *LogFile) { } // Obtain lock to swap in index file and write manifest. - if err := func() error { + if err := func() (rErr error) { p.mu.Lock() defer p.mu.Unlock() // Replace previous log file with index file. - p.fileSet = p.fileSet.MustReplace([]File{logFile}, file) + newFileSet := p.fileSet.MustReplace([]File{logFile}, file) + + defer errors2.Capture(&rErr, func() error { + if rErr != nil { + // close new file + file.Close() + } + return rErr + })() // Write new manifest. - manifestSize, err := p.Manifest().Write() - if err != nil { - // TODO: Close index if write fails. - return err - } + manifestSize, err := p.manifest(newFileSet).Write() + if err != nil { + return fmt.Errorf("manifest file write failed compacting log file %q: %w", p.ManifestPath(), err) + } + // Store the new FileSet in the partition now that the manifest has been written + p.fileSet = newFileSet p.manifestSize = manifestSize return nil }(); err != nil { @@ -1389,6 +1441,7 @@ func (m *Manifest) Validate() error { // Write writes the manifest file to the provided path, returning the number of // bytes written and an error, if any. func (m *Manifest) Write() (int64, error) { + var tmp string buf, err := json.MarshalIndent(m, "", " ") if err != nil { return 0, fmt.Errorf("failed marshaling %q: %w", m.path, err) @@ -1401,25 +1454,30 @@ func (m *Manifest) Write() (int64, error) { return 0, err } - tmp := f.Name() // In correct operation, Remove() should fail because the file was renamed defer os.Remove(tmp) + err = func() (rErr error) { // Close() before rename for Windows defer errors2.Capture(&rErr, f.Close)() + + tmp = f.Name() + + if err = f.Chmod(0666); err != nil { + return fmt.Errorf("failed setting permissions on manifest file %q: %w", tmp, err) + } if _, err = f.Write(buf); err != nil { return fmt.Errorf("failed writing temporary manifest file %q: %w", tmp, err) } + if err = f.Sync(); err != nil { + return fmt.Errorf("failed syncing temporary manifest file to disk %q: %w", tmp, err) + } return nil }() if err != nil { return 0, err } - if err = os.Chmod(tmp, 0666); err != nil { - return 0, err - } - if err = os.Rename(tmp, m.path); err != nil { return 0, err } diff --git a/tsdb/index/tsi1/partition_test.go b/tsdb/index/tsi1/partition_test.go index bc1cfec8bd..0c7c206453 100644 --- a/tsdb/index/tsi1/partition_test.go +++ b/tsdb/index/tsi1/partition_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "syscall" "testing" "github.com/influxdata/influxdb/v2/tsdb" @@ -82,6 +83,70 @@ func TestPartition_Manifest(t *testing.T) { }) } +var badManifestPath string = filepath.Join(os.DevNull, tsi1.ManifestFileName) + +func TestPartition_Manifest_Write_Fail(t *testing.T) { + t.Run("write MANIFEST", func(t *testing.T) { + m := tsi1.NewManifest(badManifestPath) + _, err := m.Write() + if !errors.Is(err, syscall.ENOTDIR) { + t.Fatalf("expected: syscall.ENOTDIR, got %T: %v", err, err) + } + }) +} + +func TestPartition_PrependLogFile_Write_Fail(t *testing.T) { + t.Run("write MANIFEST", func(t *testing.T) { + sfile := MustOpenSeriesFile() + defer sfile.Close() + + p := MustOpenPartition(sfile.SeriesFile) + defer func() { + if err := p.Close(); err != nil { + t.Fatalf("error closing partition: %v", err) + } + }() + p.Partition.MaxLogFileSize = -1 + fileN := p.FileN() + p.CheckLogFile() + if fileN >= p.FileN() { + t.Fatalf("manifest write prepending log file should have succeeded but number of files did not change correctly: expected more than %d files, got %d files", fileN, p.FileN()) + } + p.SetManifestPathForTest(badManifestPath) + fileN = p.FileN() + p.CheckLogFile() + if fileN != p.FileN() { + t.Fatalf("manifest write prepending log file should have failed, but number of files changed: expected %d files, got %d files", fileN, p.FileN()) + } + }) +} + +func TestPartition_Compact_Write_Fail(t *testing.T) { + t.Run("write MANIFEST", func(t *testing.T) { + sfile := MustOpenSeriesFile() + defer sfile.Close() + + p := MustOpenPartition(sfile.SeriesFile) + defer func() { + if err := p.Close(); err != nil { + t.Fatalf("error closing partition: %v", err) + } + }() + p.Partition.MaxLogFileSize = -1 + fileN := p.FileN() + p.Compact() + if (1 + fileN) != p.FileN() { + t.Fatalf("manifest write in compaction should have succeeded, but number of files did not change correctly: expected %d files, got %d files", fileN+1, p.FileN()) + } + p.SetManifestPathForTest(badManifestPath) + fileN = p.FileN() + p.Compact() + if fileN != p.FileN() { + t.Fatalf("manifest write should have failed the compaction, but number of files changed: expected %d files, got %d files", fileN, p.FileN()) + } + }) +} + // Partition is a test wrapper for tsi1.Partition. type Partition struct { *tsi1.Partition