fix: do not rename files on mmap failure (#25340)
If NewTSMReader() fails because mmap fails, do not
rename the file, because the error is probably
caused by vm.max_map_count being too low
Closes: #25337
(cherry picked from commit ec412f793b
)
pull/25358/head
parent
5a599383f1
commit
5aff511e40
|
@ -172,12 +172,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
|
|||
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
|
||||
}
|
||||
|
||||
fs := NewFileStore(path, etags)
|
||||
fs := NewFileStore(path, etags, WithMadviseWillNeed(opt.Config.TSMWillNeed))
|
||||
fs.openLimiter = opt.OpenLimiter
|
||||
if opt.FileStoreObserver != nil {
|
||||
fs.WithObserver(opt.FileStoreObserver)
|
||||
}
|
||||
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
|
||||
|
||||
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)
|
||||
|
||||
|
|
|
@ -176,9 +176,8 @@ type FileStore struct {
|
|||
currentGeneration int
|
||||
dir string
|
||||
|
||||
files []TSMFile
|
||||
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
|
||||
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
|
||||
files []TSMFile
|
||||
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
|
||||
|
||||
logger *zap.Logger // Logger to be used for important messages
|
||||
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
|
||||
|
@ -198,6 +197,8 @@ type FileStore struct {
|
|||
// newReaderBlockCount keeps track of the current new reader block requests.
|
||||
// If non-zero, no new TSMReader objects may be created.
|
||||
newReaderBlockCount int
|
||||
|
||||
readerOptions []tsmReaderOption
|
||||
}
|
||||
|
||||
// FileStat holds information about a TSM file on disk.
|
||||
|
@ -234,7 +235,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
|
|||
}
|
||||
|
||||
// NewFileStore returns a new instance of FileStore based on the given directory.
|
||||
func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
|
||||
func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) *FileStore {
|
||||
logger := zap.NewNop()
|
||||
fs := &FileStore{
|
||||
dir: dir,
|
||||
|
@ -250,6 +251,7 @@ func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
|
|||
obs: noFileStoreObserver{},
|
||||
parseFileName: DefaultParseFileName,
|
||||
copyFiles: runtime.GOOS == "windows",
|
||||
readerOptions: options,
|
||||
}
|
||||
fs.purger.fileStore = fs
|
||||
return fs
|
||||
|
@ -616,28 +618,37 @@ func (f *FileStore) Open(ctx context.Context) error {
|
|||
defer f.openLimiter.Release()
|
||||
|
||||
start := time.Now()
|
||||
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
|
||||
df, err := NewTSMReader(file, f.readerOptions...)
|
||||
f.logger.Info("Opened file",
|
||||
zap.String("path", file.Name()),
|
||||
zap.Int("id", idx),
|
||||
zap.Duration("duration", time.Since(start)))
|
||||
|
||||
// If we are unable to read a TSM file then log the error, rename
|
||||
// the file, and continue loading the shard without it.
|
||||
// If we are unable to read a TSM file then log the error.
|
||||
if err != nil {
|
||||
if cerr := file.Close(); cerr != nil {
|
||||
f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr))
|
||||
}
|
||||
// If the file is corrupt, rename it and
|
||||
// continue loading the shard without it.
|
||||
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
|
||||
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
|
||||
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
|
||||
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)}
|
||||
if errors.Is(err, MmapError{}) {
|
||||
// An MmapError may indicate we have insufficient
|
||||
// handles for the mmap call, in which case the file should
|
||||
// be left untouched, and the vm.max_map_count be raised.
|
||||
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
|
||||
zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
|
||||
readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)}
|
||||
return
|
||||
} else {
|
||||
// If the file is corrupt, rename it and
|
||||
// continue loading the shard without it.
|
||||
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
|
||||
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
|
||||
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
|
||||
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
|
||||
return
|
||||
}
|
||||
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
|
||||
return
|
||||
}
|
||||
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)}
|
||||
return
|
||||
}
|
||||
df.WithObserver(f.obs)
|
||||
readerC <- &res{r: df}
|
||||
|
@ -920,7 +931,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
}
|
||||
}
|
||||
|
||||
tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
|
||||
tsm, err := NewTSMReader(fd, f.readerOptions...)
|
||||
if err != nil {
|
||||
if newName != oldName {
|
||||
if err1 := os.Rename(newName, oldName); err1 != nil {
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
)
|
||||
|
||||
var TestMmapInitFailOption = func(err error) tsmReaderOption {
|
||||
return func(r *TSMReader) {
|
||||
r.accessor = &badBlockAccessor{error: err}
|
||||
}
|
||||
}
|
||||
|
||||
type badBlockAccessor struct {
|
||||
error
|
||||
initCalled bool
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) init() (*indirectIndex, error) {
|
||||
b.initCalled = true
|
||||
return nil, b.error
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) rename(path string) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) path() string {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) close() error {
|
||||
if !b.initCalled {
|
||||
panic("close called without an init call")
|
||||
}
|
||||
b.initCalled = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *badBlockAccessor) free() error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
@ -2412,6 +2413,42 @@ func TestFileStore_Open(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFileStore_OpenFail(t *testing.T) {
|
||||
var err error
|
||||
dir := t.TempDir()
|
||||
|
||||
// Create a TSM file...
|
||||
data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}
|
||||
|
||||
files, err := newFileDir(t, dir, data)
|
||||
if err != nil {
|
||||
fatal(t, "creating test files", err)
|
||||
}
|
||||
assert.Equal(t, 1, len(files))
|
||||
f := files[0]
|
||||
|
||||
const mmapErrMsg = "mmap failure in test"
|
||||
const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg
|
||||
// With an mmap failure, the files should all be left where they are, because they are not corrupt
|
||||
openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
|
||||
assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure")
|
||||
|
||||
// With a non-mmap failure, the file failing to open should be moved aside
|
||||
const otherErrMsg = "some Random Init Failure"
|
||||
openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg))
|
||||
assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure")
|
||||
assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure")
|
||||
}
|
||||
|
||||
func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) {
|
||||
fs := tsm1.NewFileStore(dir, tsdb.EngineTags{}, tsm1.TestMmapInitFailOption(initErr))
|
||||
err := fs.Open(context.Background())
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), fullErrMsg)
|
||||
defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }()
|
||||
assert.Equal(t, 0, fs.Count(), "file count mismatch")
|
||||
}
|
||||
|
||||
func TestFileStore_Remove(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package tsm1
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
|
@ -218,6 +219,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
|
||||
// NewTSMReader returns a new TSMReader from the given file.
|
||||
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
|
||||
t := &TSMReader{}
|
||||
|
@ -231,15 +233,17 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
|
|||
}
|
||||
t.size = stat.Size()
|
||||
t.lastModified = stat.ModTime().UnixNano()
|
||||
t.accessor = &mmapAccessor{
|
||||
f: f,
|
||||
mmapWillNeed: t.madviseWillNeed,
|
||||
if t.accessor == nil {
|
||||
t.accessor = &mmapAccessor{
|
||||
f: f,
|
||||
mmapWillNeed: t.madviseWillNeed,
|
||||
}
|
||||
}
|
||||
|
||||
index, err := t.accessor.init()
|
||||
if err != nil {
|
||||
_ = t.accessor.close()
|
||||
return nil, err
|
||||
cerr := t.accessor.close()
|
||||
return nil, errors.Join(err, cerr)
|
||||
}
|
||||
|
||||
t.index = index
|
||||
|
@ -1314,6 +1318,24 @@ type mmapAccessor struct {
|
|||
index *indirectIndex
|
||||
}
|
||||
|
||||
type MmapError struct {
|
||||
error
|
||||
}
|
||||
|
||||
func (m *MmapError) Unwrap() error {
|
||||
return m.error
|
||||
}
|
||||
|
||||
func (m MmapError) Is(e error) bool {
|
||||
_, oks := e.(MmapError)
|
||||
_, okp := e.(*MmapError)
|
||||
return oks || okp
|
||||
}
|
||||
|
||||
func NewMmapError(e error) MmapError {
|
||||
return MmapError{error: e}
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) init() (*indirectIndex, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
@ -1335,7 +1357,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {
|
|||
|
||||
m.b, err = mmap(m.f, 0, int(stat.Size()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Wrap the error to let callers know this was an error
|
||||
// from mmap, and may indicate vm.max_map_count is too low
|
||||
return nil, NewMmapError(err)
|
||||
}
|
||||
if len(m.b) < 8 {
|
||||
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
|
||||
|
|
Loading…
Reference in New Issue