fix: do not leak file handles from Compactor.write (#25725) (#25740)

There are a number of code paths in Compactor.write which
on error can lead to leaked file handles to temporary files.
This, in turn, prevents the removal of the temporary files until
InfluxDB is rebooted, releasing the file handles.

closes https://github.com/influxdata/influxdb/issues/25724

(cherry picked from commit e974165d25)

closes https://github.com/influxdata/influxdb/issues/25739
pull/25770/head
davidby-influx 2025-01-06 09:03:37 -08:00 committed by GitHub
parent 5b364b51c8
commit c82d4f86ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 175 additions and 113 deletions

View File

@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
@ -65,6 +66,10 @@ func (e errCompactionInProgress) Error() string {
return "compaction in progress"
}
func (e errCompactionInProgress) Unwrap() error {
return e.err
}
type errCompactionAborted struct {
err error
}
@ -1051,6 +1056,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
// These are the new TSM files written
var files []string
var eInProgress errCompactionInProgress
for {
sequence++
@ -1060,15 +1066,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName))
// Write as much as possible to this file
err := c.write(fileName, iter, throttle, logger)
rollToNext, err := c.write(fileName, iter, throttle, logger)
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
if rollToNext {
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
files = append(files, fileName)
logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName))
continue
} else if err == ErrNoValues {
} else if errors.Is(err, ErrNoValues) {
logger.Debug("Dropping empty file", zap.String("output_file", fileName))
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
@ -1076,9 +1082,14 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
return nil, err
}
break
} else if _, ok := err.(errCompactionInProgress); ok {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
} else if errors.As(err, &eInProgress) {
if !errors.Is(eInProgress.err, fs.ErrExist) {
logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err))
} else {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName))
}
return nil, err
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
@ -1100,10 +1111,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
return files, nil
}
func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
return false, errCompactionInProgress{err: err}
}
// syncingWriter ensures that whatever we wrap the above file descriptor in
@ -1128,33 +1139,31 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(limitWriter)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(limitWriter)
if err != nil {
return err
}
}
if err != nil {
// Close the file and return if we can't create the TSMWriter
return false, errors.Join(err, fd.Close())
}
defer func() {
var eInProgress errCompactionInProgress
errs := make([]error, 0, 3)
errs = append(errs, err)
closeErr := w.Close()
if err == nil {
err = closeErr
}
errs = append(errs, closeErr)
// Check for errors where we should not remove the file
_, inProgress := err.(errCompactionInProgress)
maxBlocks := err == ErrMaxBlocksExceeded
maxFileSize := err == errMaxFileExceeded
if inProgress || maxBlocks || maxFileSize {
// Check for conditions where we should not remove the file
inProgress := errors.As(err, &eInProgress) && errors.Is(eInProgress.err, fs.ErrExist)
if (closeErr == nil) && (inProgress || rollToNext) {
// do not join errors, there is only the one.
return
} else if err != nil || closeErr != nil {
// Remove the file, we have had a problem
errs = append(errs, w.Remove())
}
if err != nil {
_ = w.Remove()
}
err = errors.Join(errs...)
}()
lastLogSize := w.Size()
@ -1164,38 +1173,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
c.mu.RUnlock()
if !enabled {
return errCompactionAborted{}
return false, errCompactionAborted{}
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
key, minTime, maxTime, block, err := iter.Read()
if err != nil {
return err
return false, err
}
if minTime > maxTime {
return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
}
// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
return err
return true, err
} else if err != nil {
return err
return false, err
}
// If we have a max file size configured and we're over it, close out the file
// If we're over maxTSMFileSize, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
return errMaxFileExceeded
return true, errMaxFileExceeded
} else if (w.Size() - lastLogSize) > logEvery {
logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size()))
lastLogSize = w.Size()
@ -1204,15 +1213,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
// Were there any errors encountered during iteration?
if err := iter.Err(); err != nil {
return err
return false, err
}
// We're all done. Close out the file.
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size()))
return nil
return false, nil
}
func (c *Compactor) add(files []string) bool {

View File

@ -1,7 +1,9 @@
package tsm1_test
import (
"errors"
"fmt"
"io/fs"
"math"
"os"
"path/filepath"
@ -12,6 +14,7 @@ import (
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
@ -115,12 +118,12 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) {
}
f2 := MustWriteTSM(t, dir, 2, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop())
@ -174,11 +177,11 @@ func TestCompactor_CompactFull(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
@ -284,11 +287,11 @@ func TestCompactor_DecodeError(t *testing.T) {
f.WriteAt([]byte("ffff"), 10) // skip over header
f.Close()
fs := &fakeFileStore{}
defer fs.Close()
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
@ -329,11 +332,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
@ -409,11 +412,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
@ -627,11 +630,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
@ -729,11 +732,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
@ -832,11 +835,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
@ -940,11 +943,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
}
f3 := MustWriteTSM(t, dir, 3, writes)
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Size = 2
compactor.Open()
@ -1056,11 +1059,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
}
f2.Close()
fs := &fakeFileStore{}
t.Cleanup(func() { fs.Close() })
ffs := &fakeFileStore{}
t.Cleanup(func() { ffs.Close() })
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.FileStore = ffs
compactor.Open()
// Compact both files, should get 2 files back
@ -1093,6 +1096,60 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
}
}
func TestCompactor_CompactFull_InProgress(t *testing.T) {
// This test creates a lot of data and causes timeout failures for these envs
if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" {
t.Skip("Skipping in progress compaction test")
}
dir := t.TempDir()
f2Name := func() string {
values := make([]tsm1.Value, 1000)
// Write a new file with 2 blocks
f2, f2Name := MustTSMWriter(t, dir, 2)
defer func() {
assert.NoError(t, f2.Close(), "closing TSM file %s", f2Name)
}()
for i := 0; i < 2; i++ {
values = values[:0]
for j := 0; j < 1000; j++ {
values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1)))
}
assert.NoError(t, f2.Write([]byte("cpu,host=A#!~#value"), values), "writing TSM file: %s", f2Name)
}
assert.NoError(t, f2.WriteIndex(), "writing TSM file index for %s", f2Name)
return f2Name
}()
ffs := &fakeFileStore{}
defer ffs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = ffs
compactor.Open()
expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name)
assert.NoError(t, err, "unexpected error parsing file name %s", f2Name)
expSeq = expSeq + 1
fileName := filepath.Join(compactor.Dir, tsm1.DefaultFormatFileName(expGen, expSeq)+"."+tsm1.TSMFileExtension+"."+tsm1.TmpTSMFileExtension)
// Create a temp file to simulate an in progress compaction
f, err := os.Create(fileName)
assert.NoError(t, err, "creating in-progress compaction file %s", fileName)
defer func() {
assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName)
}()
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop())
assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name)
e := errors.Unwrap(err)
assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress")
assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e)
pathErr := &os.PathError{}
assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e)
assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr)
}
// Tests that a single TSM file can be read and iterated over
func TestTSMKeyIterator_Single(t *testing.T) {
dir := t.TempDir()
@ -2542,14 +2599,14 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
},
}
fs := &fakeFileStore{
ffs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return testSet
},
blockCount: 1000,
}
cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond)
cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond)
plan, pLen := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(plan[0]); got != exp {
@ -2608,9 +2665,9 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
}
cp.Release(plan)
cp.FileStore = fs
cp.FileStore = ffs
// ensure that it will plan if last modified has changed
fs.lastModified = time.Now()
ffs.lastModified = time.Now()
cGroups, pLen := cp.Plan(time.Now())
if exp, got := 4, len(cGroups[0]); got != exp {
@ -2706,7 +2763,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
},
}
fs := &fakeFileStore{
ffs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return testSet
},
@ -2714,7 +2771,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
}
cp := tsm1.NewDefaultPlanner(
fs,
ffs,
time.Nanosecond,
)

View File

@ -66,6 +66,7 @@ import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
@ -511,19 +512,15 @@ func (d *directIndex) Size() uint32 {
}
func (d *directIndex) Close() error {
errs := make([]error, 0, 3)
// Flush anything remaining in the index
if err := d.w.Flush(); err != nil {
return err
errs = append(errs, d.w.Flush())
if d.fd != nil {
// Close and remove the temporary index file
errs = append(errs, d.fd.Close())
errs = append(errs, os.Remove(d.fd.Name()))
}
if d.fd == nil {
return nil
}
if err := d.fd.Close(); err != nil {
return err
}
return os.Remove(d.fd.Name())
return errors.Join(errs...)
}
// Remove removes the index from any temporary storage
@ -532,11 +529,14 @@ func (d *directIndex) Remove() error {
return nil
}
// Close the file handle to prevent leaking. We ignore the error because
// we just want to cleanup and remove the file.
_ = d.fd.Close()
return os.Remove(d.fd.Name())
errs := make([]error, 0, 2)
// Close the file handle to prevent leaking.
// We don't let an error stop the removal.
if err := d.fd.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
errs = append(errs, err)
}
errs = append(errs, os.Remove(d.fd.Name()))
return errors.Join(errs...)
}
// tsmWriter writes keys and values in the TSM format
@ -756,25 +756,19 @@ func (t *tsmWriter) sync() error {
}
func (t *tsmWriter) Close() error {
if err := t.Flush(); err != nil {
return err
}
if err := t.index.Close(); err != nil {
return err
}
errs := make([]error, 0, 3)
errs = append(errs, t.Flush())
errs = append(errs, t.index.Close())
if c, ok := t.wrapped.(io.Closer); ok {
return c.Close()
errs = append(errs, c.Close())
}
return nil
return errors.Join(errs...)
}
// Remove removes any temporary storage used by the writer.
func (t *tsmWriter) Remove() error {
if err := t.index.Remove(); err != nil {
return err
}
errs := make([]error, 0, 3)
errs = append(errs, t.index.Remove())
// nameCloser is the most permissive interface we can close the wrapped
// value with.
@ -783,14 +777,16 @@ func (t *tsmWriter) Remove() error {
Name() string
}
// If the writer is not a memory buffer, we can remove the file.
if f, ok := t.wrapped.(nameCloser); ok {
// Close the file handle to prevent leaking. We ignore the error because
// we just want to cleanup and remove the file.
_ = f.Close()
return os.Remove(f.Name())
// Close the file handle to prevent leaking.
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
errs = append(errs, err)
}
// Remove the file
errs = append(errs, os.Remove(f.Name()))
}
return nil
return errors.Join(errs...)
}
func (t *tsmWriter) Size() uint32 {