fix: remove temp files on error in Compactor.writeNewFiles (#26074)
Compactor.writeNewFiles should delete temporary files created on iterations before an error halts the compaction. closes https://github.com/influxdata/influxdb/issues/26073db/update-influx_tools
parent
ba95c9b0f0
commit
1efb8dad43
|
@ -1085,6 +1085,17 @@ func (c *Compactor) RemoveTmpFiles(files []string) error {
|
|||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error {
|
||||
removeErr := c.RemoveTmpFiles(files)
|
||||
if removeErr == nil {
|
||||
return errors.Join(originalErrs...)
|
||||
} else if errJoin, ok := removeErr.(interface{ Unwrap() []error }); ok {
|
||||
return errors.Join(append(originalErrs, errJoin.Unwrap()...)...)
|
||||
} else {
|
||||
return errors.Join(append(originalErrs, removeErr)...)
|
||||
}
|
||||
}
|
||||
|
||||
// writeNewFiles writes from the iterator into new TSM files, rotating
|
||||
// to a new file once it has reached the max TSM file size.
|
||||
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
|
||||
|
@ -1113,7 +1124,8 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
// If the file only contained tombstoned entries, then it would be a 0 length
|
||||
// file that we can drop.
|
||||
if err := os.RemoveAll(fileName); err != nil {
|
||||
return nil, err
|
||||
// Only return an error if we couldn't remove the temp files
|
||||
return nil, c.RemoveTmpFilesOnErr(files, err)
|
||||
}
|
||||
break
|
||||
} else if errors.As(err, &eInProgress) {
|
||||
|
@ -1124,27 +1136,14 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
// planner keeps track of which files are assigned to compaction plans now.
|
||||
logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName))
|
||||
}
|
||||
return nil, err
|
||||
} else if err != nil {
|
||||
var errs []error
|
||||
errs = append(errs, err)
|
||||
// We hit an error and didn't finish the compaction. Abort.
|
||||
// Remove any tmp files we already completed
|
||||
// discard later errors to return the first one from the write() call
|
||||
for _, f := range files {
|
||||
err = os.RemoveAll(f)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
// Remove the temp file
|
||||
// discard later errors to return the first one from the write() call
|
||||
err = os.RemoveAll(fileName)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
return nil, errors.Join(errs...)
|
||||
return nil, c.RemoveTmpFilesOnErr(files, err)
|
||||
} else if err != nil {
|
||||
// We hit an error and didn't finish the compaction. Abort.
|
||||
// Remove any tmp files we already completed, as well as the current
|
||||
// file we were writing to.
|
||||
return nil, c.RemoveTmpFilesOnErr(files, err, os.RemoveAll(fileName))
|
||||
}
|
||||
|
||||
files = append(files, fileName)
|
||||
|
|
Loading…
Reference in New Issue