Do not update the `FileSet` or `activeLogFile` field in the in-memory
Partition structure if the Manifest file is not correctly saved to
the disk.
closes https://github.com/influxdata/influxdb/issues/23553
(cherry picked from commit a8732dcf52
)
closes https://github.com/influxdata/influxdb/issues/23554
pull/23586/head
parent
37562c7c00
commit
619eb1cae6
|
@ -309,13 +309,7 @@ func (i *Index) Open() (rErr error) {
|
||||||
|
|
||||||
func (i *Index) cleanUpFail(err *error) {
|
func (i *Index) cleanUpFail(err *error) {
|
||||||
if nil != *err {
|
if nil != *err {
|
||||||
for _, p := range i.partitions {
|
i.close()
|
||||||
if (p != nil) && p.IsOpen() {
|
|
||||||
if e := p.Close(); e != nil {
|
|
||||||
i.logger.Warn("Failed to clean up partition")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,16 +346,24 @@ func (i *Index) Close() error {
|
||||||
// Lock index and close partitions.
|
// Lock index and close partitions.
|
||||||
i.mu.Lock()
|
i.mu.Lock()
|
||||||
defer i.mu.Unlock()
|
defer i.mu.Unlock()
|
||||||
|
return i.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// close closes the index without locking
|
||||||
|
func (i *Index) close() (rErr error) {
|
||||||
for _, p := range i.partitions {
|
for _, p := range i.partitions {
|
||||||
if err := p.Close(); err != nil {
|
if (p != nil) && p.IsOpen() {
|
||||||
return err
|
if pErr := p.Close(); pErr != nil {
|
||||||
|
i.logger.Warn("Failed to clean up partition", zap.String("path", p.Path()))
|
||||||
|
if rErr == nil {
|
||||||
|
rErr = pErr
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark index as closed.
|
// Mark index as closed.
|
||||||
i.opened = false
|
i.opened = false
|
||||||
return nil
|
return rErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Path returns the path the index was opened with.
|
// Path returns the path the index was opened with.
|
||||||
|
|
|
@ -86,11 +86,13 @@ type Partition struct {
|
||||||
|
|
||||||
// Index's version.
|
// Index's version.
|
||||||
version int
|
version int
|
||||||
|
|
||||||
|
manifestPathFn func() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPartition returns a new instance of Partition.
|
// NewPartition returns a new instance of Partition.
|
||||||
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
|
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
|
||||||
return &Partition{
|
p := &Partition{
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
path: path,
|
path: path,
|
||||||
sfile: sfile,
|
sfile: sfile,
|
||||||
|
@ -104,6 +106,8 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
|
||||||
logger: zap.NewNop(),
|
logger: zap.NewNop(),
|
||||||
version: Version,
|
version: Version,
|
||||||
}
|
}
|
||||||
|
p.manifestPathFn = p.manifestPath
|
||||||
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
// bytes estimates the memory footprint of this Partition, in bytes.
|
// bytes estimates the memory footprint of this Partition, in bytes.
|
||||||
|
@ -408,27 +412,44 @@ func (p *Partition) nextSequence() int {
|
||||||
return p.seq
|
return p.seq
|
||||||
}
|
}
|
||||||
|
|
||||||
// ManifestPath returns the path to the index's manifest file.
|
|
||||||
func (p *Partition) ManifestPath() string {
|
func (p *Partition) ManifestPath() string {
|
||||||
|
return p.manifestPathFn()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ManifestPath returns the path to the index's manifest file.
|
||||||
|
func (p *Partition) manifestPath() string {
|
||||||
return filepath.Join(p.path, ManifestFileName)
|
return filepath.Join(p.path, ManifestFileName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Manifest returns a manifest for the index.
|
// Manifest returns a manifest for the index.
|
||||||
func (p *Partition) Manifest() *Manifest {
|
func (p *Partition) Manifest() *Manifest {
|
||||||
|
return p.manifest(p.fileSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
// manifest returns a manifest for the index, possibly using a
|
||||||
|
// new FileSet to account for compaction or log prepending
|
||||||
|
func (p *Partition) manifest(newFileSet *FileSet) *Manifest {
|
||||||
m := &Manifest{
|
m := &Manifest{
|
||||||
Levels: p.levels,
|
Levels: p.levels,
|
||||||
Files: make([]string, len(p.fileSet.files)),
|
Files: make([]string, len(newFileSet.files)),
|
||||||
Version: p.version,
|
Version: p.version,
|
||||||
path: p.ManifestPath(),
|
path: p.ManifestPath(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for j, f := range p.fileSet.files {
|
for j, f := range newFileSet.files {
|
||||||
m.Files[j] = filepath.Base(f.Path())
|
m.Files[j] = filepath.Base(f.Path())
|
||||||
}
|
}
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetManifestPathForTest is only to force a bad path in testing
|
||||||
|
func (p *Partition) SetManifestPathForTest(path string) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
p.manifestPathFn = func() string { return path }
|
||||||
|
}
|
||||||
|
|
||||||
// WithLogger sets the logger for the index.
|
// WithLogger sets the logger for the index.
|
||||||
func (p *Partition) WithLogger(logger *zap.Logger) {
|
func (p *Partition) WithLogger(logger *zap.Logger) {
|
||||||
p.logger = logger.With(zap.String("index", "tsi"))
|
p.logger = logger.With(zap.String("index", "tsi"))
|
||||||
|
@ -468,28 +489,42 @@ func (p *Partition) retainFileSet() *FileSet {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileN returns the active files in the file set.
|
// FileN returns the active files in the file set.
|
||||||
func (p *Partition) FileN() int { return len(p.fileSet.files) }
|
func (p *Partition) FileN() int {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
return len(p.fileSet.files)
|
||||||
|
}
|
||||||
|
|
||||||
// prependActiveLogFile adds a new log file so that the current log file can be compacted.
|
// prependActiveLogFile adds a new log file so that the current log file can be compacted.
|
||||||
func (p *Partition) prependActiveLogFile() error {
|
func (p *Partition) prependActiveLogFile() (rErr error) {
|
||||||
// Open file and insert it into the first position.
|
// Open file and insert it into the first position.
|
||||||
f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence())))
|
f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence())))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.activeLogFile = f
|
var oldActiveFile *LogFile
|
||||||
|
p.activeLogFile, oldActiveFile = f, p.activeLogFile
|
||||||
|
|
||||||
// Prepend and generate new fileset.
|
// Prepend and generate new fileset but do not yet update the partition
|
||||||
p.fileSet = p.fileSet.PrependLogFile(f)
|
newFileSet := p.fileSet.PrependLogFile(f)
|
||||||
|
|
||||||
|
errors2.Capture(&rErr, func() error {
|
||||||
|
if rErr != nil {
|
||||||
|
// close the new file.
|
||||||
|
f.Close()
|
||||||
|
p.activeLogFile = oldActiveFile
|
||||||
|
}
|
||||||
|
return rErr
|
||||||
|
})()
|
||||||
|
|
||||||
// Write new manifest.
|
// Write new manifest.
|
||||||
manifestSize, err := p.Manifest().Write()
|
manifestSize, err := p.manifest(newFileSet).Write()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: Close index if write fails.
|
return fmt.Errorf("manifest write failed for %q: %w", p.ManifestPath(), err)
|
||||||
p.logger.Error("manifest write failed, index is potentially damaged", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
p.manifestSize = manifestSize
|
p.manifestSize = manifestSize
|
||||||
|
// Store the new FileSet in the partition now that the manifest has been written
|
||||||
|
p.fileSet = newFileSet
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1113,20 +1148,28 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Obtain lock to swap in index file and write manifest.
|
// Obtain lock to swap in index file and write manifest.
|
||||||
if err := func() error {
|
if err := func() (rErr error) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
// Replace previous files with new index file.
|
// Replace previous files with new index file.
|
||||||
p.fileSet = p.fileSet.MustReplace(IndexFiles(files).Files(), file)
|
newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file)
|
||||||
|
|
||||||
// Write new manifest.
|
// Write new manifest.
|
||||||
manifestSize, err := p.Manifest().Write()
|
manifestSize, err := p.manifest(newFileSet).Write()
|
||||||
|
defer errors2.Capture(&rErr, func() error {
|
||||||
|
if rErr != nil {
|
||||||
|
// Close the new file to avoid leaks.
|
||||||
|
file.Close()
|
||||||
|
}
|
||||||
|
return rErr
|
||||||
|
})()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: Close index if write fails.
|
return fmt.Errorf("manifest file write failed compacting index %q: %w", p.ManifestPath(), err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
p.manifestSize = manifestSize
|
p.manifestSize = manifestSize
|
||||||
|
// Store the new FileSet in the partition now that the manifest has been written
|
||||||
|
p.fileSet = newFileSet
|
||||||
return nil
|
return nil
|
||||||
}(); err != nil {
|
}(); err != nil {
|
||||||
log.Error("Cannot write manifest", zap.Error(err))
|
log.Error("Cannot write manifest", zap.Error(err))
|
||||||
|
@ -1262,20 +1305,29 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Obtain lock to swap in index file and write manifest.
|
// Obtain lock to swap in index file and write manifest.
|
||||||
if err := func() error {
|
if err := func() (rErr error) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
// Replace previous log file with index file.
|
// Replace previous log file with index file.
|
||||||
p.fileSet = p.fileSet.MustReplace([]File{logFile}, file)
|
newFileSet := p.fileSet.MustReplace([]File{logFile}, file)
|
||||||
|
|
||||||
|
defer errors2.Capture(&rErr, func() error {
|
||||||
|
if rErr != nil {
|
||||||
|
// close new file
|
||||||
|
file.Close()
|
||||||
|
}
|
||||||
|
return rErr
|
||||||
|
})()
|
||||||
|
|
||||||
// Write new manifest.
|
// Write new manifest.
|
||||||
manifestSize, err := p.Manifest().Write()
|
manifestSize, err := p.manifest(newFileSet).Write()
|
||||||
if err != nil {
|
|
||||||
// TODO: Close index if write fails.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("manifest file write failed compacting log file %q: %w", p.ManifestPath(), err)
|
||||||
|
}
|
||||||
|
// Store the new FileSet in the partition now that the manifest has been written
|
||||||
|
p.fileSet = newFileSet
|
||||||
p.manifestSize = manifestSize
|
p.manifestSize = manifestSize
|
||||||
return nil
|
return nil
|
||||||
}(); err != nil {
|
}(); err != nil {
|
||||||
|
@ -1389,6 +1441,7 @@ func (m *Manifest) Validate() error {
|
||||||
// Write writes the manifest file to the provided path, returning the number of
|
// Write writes the manifest file to the provided path, returning the number of
|
||||||
// bytes written and an error, if any.
|
// bytes written and an error, if any.
|
||||||
func (m *Manifest) Write() (int64, error) {
|
func (m *Manifest) Write() (int64, error) {
|
||||||
|
var tmp string
|
||||||
buf, err := json.MarshalIndent(m, "", " ")
|
buf, err := json.MarshalIndent(m, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("failed marshaling %q: %w", m.path, err)
|
return 0, fmt.Errorf("failed marshaling %q: %w", m.path, err)
|
||||||
|
@ -1401,25 +1454,30 @@ func (m *Manifest) Write() (int64, error) {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp := f.Name()
|
|
||||||
// In correct operation, Remove() should fail because the file was renamed
|
// In correct operation, Remove() should fail because the file was renamed
|
||||||
defer os.Remove(tmp)
|
defer os.Remove(tmp)
|
||||||
|
|
||||||
err = func() (rErr error) {
|
err = func() (rErr error) {
|
||||||
// Close() before rename for Windows
|
// Close() before rename for Windows
|
||||||
defer errors2.Capture(&rErr, f.Close)()
|
defer errors2.Capture(&rErr, f.Close)()
|
||||||
|
|
||||||
|
tmp = f.Name()
|
||||||
|
|
||||||
|
if err = f.Chmod(0666); err != nil {
|
||||||
|
return fmt.Errorf("failed setting permissions on manifest file %q: %w", tmp, err)
|
||||||
|
}
|
||||||
if _, err = f.Write(buf); err != nil {
|
if _, err = f.Write(buf); err != nil {
|
||||||
return fmt.Errorf("failed writing temporary manifest file %q: %w", tmp, err)
|
return fmt.Errorf("failed writing temporary manifest file %q: %w", tmp, err)
|
||||||
}
|
}
|
||||||
|
if err = f.Sync(); err != nil {
|
||||||
|
return fmt.Errorf("failed syncing temporary manifest file to disk %q: %w", tmp, err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}()
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = os.Chmod(tmp, 0666); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = os.Rename(tmp, m.path); err != nil {
|
if err = os.Rename(tmp, m.path); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/tsdb"
|
"github.com/influxdata/influxdb/v2/tsdb"
|
||||||
|
@ -82,6 +83,70 @@ func TestPartition_Manifest(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var badManifestPath string = filepath.Join(os.DevNull, tsi1.ManifestFileName)
|
||||||
|
|
||||||
|
func TestPartition_Manifest_Write_Fail(t *testing.T) {
|
||||||
|
t.Run("write MANIFEST", func(t *testing.T) {
|
||||||
|
m := tsi1.NewManifest(badManifestPath)
|
||||||
|
_, err := m.Write()
|
||||||
|
if !errors.Is(err, syscall.ENOTDIR) {
|
||||||
|
t.Fatalf("expected: syscall.ENOTDIR, got %T: %v", err, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPartition_PrependLogFile_Write_Fail(t *testing.T) {
|
||||||
|
t.Run("write MANIFEST", func(t *testing.T) {
|
||||||
|
sfile := MustOpenSeriesFile()
|
||||||
|
defer sfile.Close()
|
||||||
|
|
||||||
|
p := MustOpenPartition(sfile.SeriesFile)
|
||||||
|
defer func() {
|
||||||
|
if err := p.Close(); err != nil {
|
||||||
|
t.Fatalf("error closing partition: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
p.Partition.MaxLogFileSize = -1
|
||||||
|
fileN := p.FileN()
|
||||||
|
p.CheckLogFile()
|
||||||
|
if fileN >= p.FileN() {
|
||||||
|
t.Fatalf("manifest write prepending log file should have succeeded but number of files did not change correctly: expected more than %d files, got %d files", fileN, p.FileN())
|
||||||
|
}
|
||||||
|
p.SetManifestPathForTest(badManifestPath)
|
||||||
|
fileN = p.FileN()
|
||||||
|
p.CheckLogFile()
|
||||||
|
if fileN != p.FileN() {
|
||||||
|
t.Fatalf("manifest write prepending log file should have failed, but number of files changed: expected %d files, got %d files", fileN, p.FileN())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPartition_Compact_Write_Fail(t *testing.T) {
|
||||||
|
t.Run("write MANIFEST", func(t *testing.T) {
|
||||||
|
sfile := MustOpenSeriesFile()
|
||||||
|
defer sfile.Close()
|
||||||
|
|
||||||
|
p := MustOpenPartition(sfile.SeriesFile)
|
||||||
|
defer func() {
|
||||||
|
if err := p.Close(); err != nil {
|
||||||
|
t.Fatalf("error closing partition: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
p.Partition.MaxLogFileSize = -1
|
||||||
|
fileN := p.FileN()
|
||||||
|
p.Compact()
|
||||||
|
if (1 + fileN) != p.FileN() {
|
||||||
|
t.Fatalf("manifest write in compaction should have succeeded, but number of files did not change correctly: expected %d files, got %d files", fileN+1, p.FileN())
|
||||||
|
}
|
||||||
|
p.SetManifestPathForTest(badManifestPath)
|
||||||
|
fileN = p.FileN()
|
||||||
|
p.Compact()
|
||||||
|
if fileN != p.FileN() {
|
||||||
|
t.Fatalf("manifest write should have failed the compaction, but number of files changed: expected %d files, got %d files", fileN, p.FileN())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Partition is a test wrapper for tsi1.Partition.
|
// Partition is a test wrapper for tsi1.Partition.
|
||||||
type Partition struct {
|
type Partition struct {
|
||||||
*tsi1.Partition
|
*tsi1.Partition
|
||||||
|
|
Loading…
Reference in New Issue