Do not update the `FileSet` or `activeLogFile` field in the in-memory
Partition structure if the Manifest file is not correctly saved to
the disk.
closes https://github.com/influxdata/influxdb/issues/23553
(cherry picked from commit a8732dcf52
)
closes https://github.com/influxdata/influxdb/issues/23554
pull/23586/head
parent
37562c7c00
commit
619eb1cae6
|
@ -309,13 +309,7 @@ func (i *Index) Open() (rErr error) {
|
|||
|
||||
func (i *Index) cleanUpFail(err *error) {
|
||||
if nil != *err {
|
||||
for _, p := range i.partitions {
|
||||
if (p != nil) && p.IsOpen() {
|
||||
if e := p.Close(); e != nil {
|
||||
i.logger.Warn("Failed to clean up partition")
|
||||
}
|
||||
}
|
||||
}
|
||||
i.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -352,16 +346,24 @@ func (i *Index) Close() error {
|
|||
// Lock index and close partitions.
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
return i.close()
|
||||
}
|
||||
|
||||
// close closes the index without locking
|
||||
func (i *Index) close() (rErr error) {
|
||||
for _, p := range i.partitions {
|
||||
if err := p.Close(); err != nil {
|
||||
return err
|
||||
if (p != nil) && p.IsOpen() {
|
||||
if pErr := p.Close(); pErr != nil {
|
||||
i.logger.Warn("Failed to clean up partition", zap.String("path", p.Path()))
|
||||
if rErr == nil {
|
||||
rErr = pErr
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark index as closed.
|
||||
i.opened = false
|
||||
return nil
|
||||
return rErr
|
||||
}
|
||||
|
||||
// Path returns the path the index was opened with.
|
||||
|
|
|
@ -86,11 +86,13 @@ type Partition struct {
|
|||
|
||||
// Index's version.
|
||||
version int
|
||||
|
||||
manifestPathFn func() string
|
||||
}
|
||||
|
||||
// NewPartition returns a new instance of Partition.
|
||||
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
|
||||
return &Partition{
|
||||
p := &Partition{
|
||||
closing: make(chan struct{}),
|
||||
path: path,
|
||||
sfile: sfile,
|
||||
|
@ -104,6 +106,8 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
|
|||
logger: zap.NewNop(),
|
||||
version: Version,
|
||||
}
|
||||
p.manifestPathFn = p.manifestPath
|
||||
return p
|
||||
}
|
||||
|
||||
// bytes estimates the memory footprint of this Partition, in bytes.
|
||||
|
@ -408,27 +412,44 @@ func (p *Partition) nextSequence() int {
|
|||
return p.seq
|
||||
}
|
||||
|
||||
// ManifestPath returns the path to the index's manifest file.
|
||||
func (p *Partition) ManifestPath() string {
|
||||
return p.manifestPathFn()
|
||||
}
|
||||
|
||||
// ManifestPath returns the path to the index's manifest file.
|
||||
func (p *Partition) manifestPath() string {
|
||||
return filepath.Join(p.path, ManifestFileName)
|
||||
}
|
||||
|
||||
// Manifest returns a manifest for the index.
|
||||
func (p *Partition) Manifest() *Manifest {
|
||||
return p.manifest(p.fileSet)
|
||||
}
|
||||
|
||||
// manifest returns a manifest for the index, possibly using a
|
||||
// new FileSet to account for compaction or log prepending
|
||||
func (p *Partition) manifest(newFileSet *FileSet) *Manifest {
|
||||
m := &Manifest{
|
||||
Levels: p.levels,
|
||||
Files: make([]string, len(p.fileSet.files)),
|
||||
Files: make([]string, len(newFileSet.files)),
|
||||
Version: p.version,
|
||||
path: p.ManifestPath(),
|
||||
}
|
||||
|
||||
for j, f := range p.fileSet.files {
|
||||
for j, f := range newFileSet.files {
|
||||
m.Files[j] = filepath.Base(f.Path())
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// SetManifestPathForTest is only to force a bad path in testing
|
||||
func (p *Partition) SetManifestPathForTest(path string) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.manifestPathFn = func() string { return path }
|
||||
}
|
||||
|
||||
// WithLogger sets the logger for the index.
|
||||
func (p *Partition) WithLogger(logger *zap.Logger) {
|
||||
p.logger = logger.With(zap.String("index", "tsi"))
|
||||
|
@ -468,28 +489,42 @@ func (p *Partition) retainFileSet() *FileSet {
|
|||
}
|
||||
|
||||
// FileN returns the active files in the file set.
|
||||
func (p *Partition) FileN() int { return len(p.fileSet.files) }
|
||||
func (p *Partition) FileN() int {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return len(p.fileSet.files)
|
||||
}
|
||||
|
||||
// prependActiveLogFile adds a new log file so that the current log file can be compacted.
|
||||
func (p *Partition) prependActiveLogFile() error {
|
||||
func (p *Partition) prependActiveLogFile() (rErr error) {
|
||||
// Open file and insert it into the first position.
|
||||
f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence())))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.activeLogFile = f
|
||||
var oldActiveFile *LogFile
|
||||
p.activeLogFile, oldActiveFile = f, p.activeLogFile
|
||||
|
||||
// Prepend and generate new fileset.
|
||||
p.fileSet = p.fileSet.PrependLogFile(f)
|
||||
// Prepend and generate new fileset but do not yet update the partition
|
||||
newFileSet := p.fileSet.PrependLogFile(f)
|
||||
|
||||
errors2.Capture(&rErr, func() error {
|
||||
if rErr != nil {
|
||||
// close the new file.
|
||||
f.Close()
|
||||
p.activeLogFile = oldActiveFile
|
||||
}
|
||||
return rErr
|
||||
})()
|
||||
|
||||
// Write new manifest.
|
||||
manifestSize, err := p.Manifest().Write()
|
||||
manifestSize, err := p.manifest(newFileSet).Write()
|
||||
if err != nil {
|
||||
// TODO: Close index if write fails.
|
||||
p.logger.Error("manifest write failed, index is potentially damaged", zap.Error(err))
|
||||
return err
|
||||
return fmt.Errorf("manifest write failed for %q: %w", p.ManifestPath(), err)
|
||||
}
|
||||
p.manifestSize = manifestSize
|
||||
// Store the new FileSet in the partition now that the manifest has been written
|
||||
p.fileSet = newFileSet
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1113,20 +1148,28 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
|
|||
}
|
||||
|
||||
// Obtain lock to swap in index file and write manifest.
|
||||
if err := func() error {
|
||||
if err := func() (rErr error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// Replace previous files with new index file.
|
||||
p.fileSet = p.fileSet.MustReplace(IndexFiles(files).Files(), file)
|
||||
newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file)
|
||||
|
||||
// Write new manifest.
|
||||
manifestSize, err := p.Manifest().Write()
|
||||
manifestSize, err := p.manifest(newFileSet).Write()
|
||||
defer errors2.Capture(&rErr, func() error {
|
||||
if rErr != nil {
|
||||
// Close the new file to avoid leaks.
|
||||
file.Close()
|
||||
}
|
||||
return rErr
|
||||
})()
|
||||
if err != nil {
|
||||
// TODO: Close index if write fails.
|
||||
return err
|
||||
return fmt.Errorf("manifest file write failed compacting index %q: %w", p.ManifestPath(), err)
|
||||
}
|
||||
p.manifestSize = manifestSize
|
||||
// Store the new FileSet in the partition now that the manifest has been written
|
||||
p.fileSet = newFileSet
|
||||
return nil
|
||||
}(); err != nil {
|
||||
log.Error("Cannot write manifest", zap.Error(err))
|
||||
|
@ -1262,20 +1305,29 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
|
|||
}
|
||||
|
||||
// Obtain lock to swap in index file and write manifest.
|
||||
if err := func() error {
|
||||
if err := func() (rErr error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// Replace previous log file with index file.
|
||||
p.fileSet = p.fileSet.MustReplace([]File{logFile}, file)
|
||||
newFileSet := p.fileSet.MustReplace([]File{logFile}, file)
|
||||
|
||||
defer errors2.Capture(&rErr, func() error {
|
||||
if rErr != nil {
|
||||
// close new file
|
||||
file.Close()
|
||||
}
|
||||
return rErr
|
||||
})()
|
||||
|
||||
// Write new manifest.
|
||||
manifestSize, err := p.Manifest().Write()
|
||||
if err != nil {
|
||||
// TODO: Close index if write fails.
|
||||
return err
|
||||
}
|
||||
manifestSize, err := p.manifest(newFileSet).Write()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("manifest file write failed compacting log file %q: %w", p.ManifestPath(), err)
|
||||
}
|
||||
// Store the new FileSet in the partition now that the manifest has been written
|
||||
p.fileSet = newFileSet
|
||||
p.manifestSize = manifestSize
|
||||
return nil
|
||||
}(); err != nil {
|
||||
|
@ -1389,6 +1441,7 @@ func (m *Manifest) Validate() error {
|
|||
// Write writes the manifest file to the provided path, returning the number of
|
||||
// bytes written and an error, if any.
|
||||
func (m *Manifest) Write() (int64, error) {
|
||||
var tmp string
|
||||
buf, err := json.MarshalIndent(m, "", " ")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed marshaling %q: %w", m.path, err)
|
||||
|
@ -1401,25 +1454,30 @@ func (m *Manifest) Write() (int64, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
tmp := f.Name()
|
||||
// In correct operation, Remove() should fail because the file was renamed
|
||||
defer os.Remove(tmp)
|
||||
|
||||
err = func() (rErr error) {
|
||||
// Close() before rename for Windows
|
||||
defer errors2.Capture(&rErr, f.Close)()
|
||||
|
||||
tmp = f.Name()
|
||||
|
||||
if err = f.Chmod(0666); err != nil {
|
||||
return fmt.Errorf("failed setting permissions on manifest file %q: %w", tmp, err)
|
||||
}
|
||||
if _, err = f.Write(buf); err != nil {
|
||||
return fmt.Errorf("failed writing temporary manifest file %q: %w", tmp, err)
|
||||
}
|
||||
if err = f.Sync(); err != nil {
|
||||
return fmt.Errorf("failed syncing temporary manifest file to disk %q: %w", tmp, err)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err = os.Chmod(tmp, 0666); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err = os.Rename(tmp, m.path); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
|
@ -82,6 +83,70 @@ func TestPartition_Manifest(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
var badManifestPath string = filepath.Join(os.DevNull, tsi1.ManifestFileName)
|
||||
|
||||
func TestPartition_Manifest_Write_Fail(t *testing.T) {
|
||||
t.Run("write MANIFEST", func(t *testing.T) {
|
||||
m := tsi1.NewManifest(badManifestPath)
|
||||
_, err := m.Write()
|
||||
if !errors.Is(err, syscall.ENOTDIR) {
|
||||
t.Fatalf("expected: syscall.ENOTDIR, got %T: %v", err, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPartition_PrependLogFile_Write_Fail(t *testing.T) {
|
||||
t.Run("write MANIFEST", func(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
p := MustOpenPartition(sfile.SeriesFile)
|
||||
defer func() {
|
||||
if err := p.Close(); err != nil {
|
||||
t.Fatalf("error closing partition: %v", err)
|
||||
}
|
||||
}()
|
||||
p.Partition.MaxLogFileSize = -1
|
||||
fileN := p.FileN()
|
||||
p.CheckLogFile()
|
||||
if fileN >= p.FileN() {
|
||||
t.Fatalf("manifest write prepending log file should have succeeded but number of files did not change correctly: expected more than %d files, got %d files", fileN, p.FileN())
|
||||
}
|
||||
p.SetManifestPathForTest(badManifestPath)
|
||||
fileN = p.FileN()
|
||||
p.CheckLogFile()
|
||||
if fileN != p.FileN() {
|
||||
t.Fatalf("manifest write prepending log file should have failed, but number of files changed: expected %d files, got %d files", fileN, p.FileN())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPartition_Compact_Write_Fail(t *testing.T) {
|
||||
t.Run("write MANIFEST", func(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
p := MustOpenPartition(sfile.SeriesFile)
|
||||
defer func() {
|
||||
if err := p.Close(); err != nil {
|
||||
t.Fatalf("error closing partition: %v", err)
|
||||
}
|
||||
}()
|
||||
p.Partition.MaxLogFileSize = -1
|
||||
fileN := p.FileN()
|
||||
p.Compact()
|
||||
if (1 + fileN) != p.FileN() {
|
||||
t.Fatalf("manifest write in compaction should have succeeded, but number of files did not change correctly: expected %d files, got %d files", fileN+1, p.FileN())
|
||||
}
|
||||
p.SetManifestPathForTest(badManifestPath)
|
||||
fileN = p.FileN()
|
||||
p.Compact()
|
||||
if fileN != p.FileN() {
|
||||
t.Fatalf("manifest write should have failed the compaction, but number of files changed: expected %d files, got %d files", fileN, p.FileN())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Partition is a test wrapper for tsi1.Partition.
|
||||
type Partition struct {
|
||||
*tsi1.Partition
|
||||
|
|
Loading…
Reference in New Issue