fix: do not rename files on mmap failure (#23396)

If NewTSMReader() fails because mmap fails, do not
rename the file, because the error is probably
caused by vm.max_map_count being too low

closes https://github.com/influxdata/influxdb/issues/23172
pull/23408/head
davidby-influx 2022-06-07 08:37:00 -07:00 committed by GitHub
parent 0ae0bd6e2e
commit ec412f793b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 202 additions and 21 deletions

View File

@ -216,12 +216,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}
fs := NewFileStore(path)
fs := NewFileStore(path, WithMadviseWillNeed(opt.Config.TSMWillNeed))
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))

View File

@ -178,9 +178,8 @@ type FileStore struct {
currentGeneration int
dir string
files []TSMFile
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
files []TSMFile
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
@ -196,6 +195,8 @@ type FileStore struct {
obs tsdb.FileStoreObserver
copyFiles bool
readerOptions []tsmReaderOption
}
// FileStat holds information about a TSM file on disk.
@ -232,7 +233,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
}
// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string) *FileStore {
func NewFileStore(dir string, options ...tsmReaderOption) *FileStore {
logger := zap.NewNop()
fs := &FileStore{
dir: dir,
@ -248,6 +249,7 @@ func NewFileStore(dir string) *FileStore {
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
copyFiles: runtime.GOOS == "windows",
readerOptions: options,
}
fs.purger.fileStore = fs
return fs
@ -554,26 +556,36 @@ func (f *FileStore) Open() error {
defer f.openLimiter.Release()
start := time.Now()
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
df, err := NewTSMReader(file, f.readerOptions...)
f.logger.Info("Opened file",
zap.String("path", file.Name()),
zap.Int("id", idx),
zap.Duration("duration", time.Since(start)))
// If we are unable to read a TSM file then log the error, rename
// the file, and continue loading the shard without it.
// If we are unable to read a TSM file then log the error.
if err != nil {
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
file.Close()
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
if errors.Is(err, MmapError{}) {
// An MmapError may indicate we have insufficient
// handles for the mmap call, in which case the file should
// be left untouched, and the vm.max_map_count be raised.
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)}
return
} else {
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
return
}
df.WithObserver(f.obs)
readerC <- &res{r: df}
}(i, file)
@ -793,7 +805,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
}
tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
tsm, err := NewTSMReader(fd, f.readerOptions...)
if err != nil {
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {

View File

@ -0,0 +1,109 @@
package tsm1
import (
"github.com/influxdata/influxdb/tsdb"
)
var TestMmapInitFailOption = func(err error) tsmReaderOption {
return func(r *TSMReader) {
r.accessor = &badBlockAccessor{error: err}
}
}
type badBlockAccessor struct {
error
}
func (b *badBlockAccessor) init() (*indirectIndex, error) {
return nil, b.error
}
func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) rename(path string) error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) path() string {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) close() error {
//TODO implement me
panic("implement me")
}
func (b *badBlockAccessor) free() error {
//TODO implement me
panic("implement me")
}

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
)
func TestFileStore_Read(t *testing.T) {
@ -2375,6 +2376,43 @@ func TestFileStore_Open(t *testing.T) {
}
}
func TestFileStore_OpenFail(t *testing.T) {
var err error
dir := MustTempDir()
defer func() { assert.NoError(t, os.RemoveAll(dir), "failed to remove temporary directory") }()
// Create a TSM file...
data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}
files, err := newFileDir(dir, data)
if err != nil {
fatal(t, "creating test files", err)
}
assert.Equal(t, 1, len(files))
f := files[0]
const mmapErrMsg = "mmap failure in test"
const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg
// With an mmap failure, the files should all be left where they are, because they are not corrupt
openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure")
// With a non-mmap failure, the file failing to open should be moved aside
const otherErrMsg = "some Random Init Failure"
openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg))
assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure")
assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure")
}
func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) {
fs := tsm1.NewFileStore(dir, tsm1.TestMmapInitFailOption(initErr))
err := fs.Open()
assert.Error(t, err)
assert.Contains(t, err.Error(), fullErrMsg)
defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }()
assert.Equal(t, 0, fs.Count(), "file count mismatch")
}
func TestFileStore_Remove(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

View File

@ -218,6 +218,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
}
}
// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
// NewTSMReader returns a new TSMReader from the given file.
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
t := &TSMReader{}
@ -231,9 +232,11 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
if t.accessor == nil {
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
}
}
index, err := t.accessor.init()
@ -1341,6 +1344,24 @@ func verifyVersion(r io.Reader) error {
return nil
}
type MmapError struct {
error
}
func (m *MmapError) Unwrap() error {
return m.error
}
func (m MmapError) Is(e error) bool {
_, oks := e.(MmapError)
_, okp := e.(*MmapError)
return oks || okp
}
func NewMmapError(e error) MmapError {
return MmapError{error: e}
}
func (m *mmapAccessor) init() (*indirectIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
@ -1366,7 +1387,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {
m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return nil, err
// Wrap the error to let callers know this was an error
// from mmap, and may indicate vm.max_map_count is too low
return nil, NewMmapError(err)
}
if len(m.b) < 8 {
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")