Merge pull request #10152 from influxdata/er-madvise
Add option to hint MADV_WILLNEED to kernelpull/10113/head
commit
c4947d9901
|
@ -116,6 +116,11 @@
|
|||
# disabled by setting it to 0.
|
||||
# max-values-per-tag = 100000
|
||||
|
||||
# If true, then the mmap advise value MADV_WILLNEED will be provided to the kernel with respect to
|
||||
# TSM files. This setting has been found to be problematic on some kernels, and defaults to off.
|
||||
# It might help users who have slow disks in some cases.
|
||||
# tsm-use-madv-willneed = false
|
||||
|
||||
###
|
||||
### [coordinator]
|
||||
###
|
||||
|
|
|
@ -105,6 +105,12 @@ type Config struct {
|
|||
MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"`
|
||||
|
||||
TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
|
||||
|
||||
// TSMWillNeed controls whether we hint to the kernel that we intend to
|
||||
// page in mmap'd sections of TSM files. This setting defaults to off, as it has
|
||||
// been found to be problematic in some cases. It may help users who have
|
||||
// slow disks.
|
||||
TSMWillNeed bool `toml:"tsm-use-madv-willneed"`
|
||||
}
|
||||
|
||||
// NewConfig returns the default configuration for tsdb.
|
||||
|
@ -127,6 +133,7 @@ func NewConfig() Config {
|
|||
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
|
||||
|
||||
TraceLoggingEnabled: false,
|
||||
TSMWillNeed: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ func TestConfig_Parse(t *testing.T) {
|
|||
dir = "/var/lib/influxdb/data"
|
||||
wal-dir = "/var/lib/influxdb/wal"
|
||||
wal-fsync-delay = "10s"
|
||||
tsm-use-madv-willneed = true
|
||||
`, &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -32,6 +33,9 @@ wal-fsync-delay = "10s"
|
|||
if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() {
|
||||
t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
|
||||
}
|
||||
if got, exp := c.TSMWillNeed, true; got != exp {
|
||||
t.Errorf("unexpected tsm-madv-willneed:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfig_Validate_Error(t *testing.T) {
|
||||
|
|
|
@ -211,6 +211,8 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
|
|||
if opt.FileStoreObserver != nil {
|
||||
fs.WithObserver(opt.FileStoreObserver)
|
||||
}
|
||||
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
|
||||
|
||||
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
|
||||
|
||||
c := NewCompactor()
|
||||
|
|
|
@ -178,7 +178,7 @@ type FileStore struct {
|
|||
dir string
|
||||
|
||||
files []TSMFile
|
||||
|
||||
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
|
||||
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
|
||||
|
||||
logger *zap.Logger // Logger to be used for important messages
|
||||
|
@ -532,7 +532,7 @@ func (f *FileStore) Open() error {
|
|||
defer f.openLimiter.Release()
|
||||
|
||||
start := time.Now()
|
||||
df, err := NewTSMReader(file)
|
||||
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
|
||||
f.logger.Info("Opened file",
|
||||
zap.String("path", file.Name()),
|
||||
zap.Int("id", idx),
|
||||
|
@ -750,7 +750,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
}
|
||||
}
|
||||
|
||||
tsm, err := NewTSMReader(fd)
|
||||
tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -27,6 +27,12 @@ func munmap(b []byte) (err error) {
|
|||
return unix.Munmap(b)
|
||||
}
|
||||
|
||||
// madviseWillNeed gives the kernel the mmap madvise value MADV_WILLNEED, hinting
|
||||
// that we plan on using the provided buffer in the near future.
|
||||
func madviseWillNeed(b []byte) error {
|
||||
return madvise(b, syscall.MADV_WILLNEED)
|
||||
}
|
||||
|
||||
func madviseDontNeed(b []byte) error {
|
||||
return madvise(b, syscall.MADV_DONTNEED)
|
||||
}
|
||||
|
|
|
@ -121,10 +121,11 @@ func munmap(b []byte) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func madviseDontNeed(b []byte) error {
|
||||
// Not supported
|
||||
return nil
|
||||
}
|
||||
// madviseWillNeed is unsupported on Windows.
|
||||
func madviseWillNeed(b []byte) error { return nil }
|
||||
|
||||
// madviseDontNeed is unsupported on Windows.
|
||||
func madviseDontNeed(b []byte) error { return nil }
|
||||
|
||||
func madvise(b []byte, advice int) error {
|
||||
// Not implemented
|
||||
|
|
|
@ -30,6 +30,7 @@ type TSMReader struct {
|
|||
refs int64
|
||||
refsWG sync.WaitGroup
|
||||
|
||||
madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED.
|
||||
mu sync.RWMutex
|
||||
|
||||
// accessor provides access and decoding of blocks for the reader.
|
||||
|
@ -207,9 +208,21 @@ func (b *BlockIterator) Err() error {
|
|||
return b.err
|
||||
}
|
||||
|
||||
type tsmReaderOption func(*TSMReader)
|
||||
|
||||
// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
|
||||
var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
|
||||
return func(r *TSMReader) {
|
||||
r.madviseWillNeed = willNeed
|
||||
}
|
||||
}
|
||||
|
||||
// NewTSMReader returns a new TSMReader from the given file.
|
||||
func NewTSMReader(f *os.File) (*TSMReader, error) {
|
||||
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
|
||||
t := &TSMReader{}
|
||||
for _, option := range options {
|
||||
option(t)
|
||||
}
|
||||
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
|
@ -219,6 +232,7 @@ func NewTSMReader(f *os.File) (*TSMReader, error) {
|
|||
t.lastModified = stat.ModTime().UnixNano()
|
||||
t.accessor = &mmapAccessor{
|
||||
f: f,
|
||||
mmapWillNeed: t.madviseWillNeed,
|
||||
}
|
||||
|
||||
index, err := t.accessor.init()
|
||||
|
@ -1286,15 +1300,15 @@ func (d *indirectIndex) Close() error {
|
|||
// mmapAccess is mmap based block accessor. It access blocks through an
|
||||
// MMAP file interface.
|
||||
type mmapAccessor struct {
|
||||
// Counter incremented everytime the mmapAccessor is accessed
|
||||
accessCount uint64
|
||||
// Counter to determine whether the accessor can free its resources
|
||||
freeCount uint64
|
||||
accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
|
||||
freeCount uint64 // Counter to determine whether the accessor can free its resources
|
||||
|
||||
mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
f *os.File
|
||||
b []byte
|
||||
f *os.File
|
||||
|
||||
index *indirectIndex
|
||||
}
|
||||
|
||||
|
@ -1325,6 +1339,15 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {
|
|||
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
|
||||
}
|
||||
|
||||
// Hint to the kernel that we will be reading the file. It would be better to hint
|
||||
// that we will be reading the index section, but that's not been
|
||||
// implemented as yet.
|
||||
if m.mmapWillNeed {
|
||||
if err := madviseWillNeed(m.b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
indexOfsPos := len(m.b) - 8
|
||||
indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
|
||||
if indexStart >= uint64(indexOfsPos) {
|
||||
|
@ -1408,9 +1431,16 @@ func (m *mmapAccessor) rename(path string) error {
|
|||
}
|
||||
|
||||
m.b, err = mmap(m.f, 0, int(stat.Size()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if m.mmapWillNeed {
|
||||
return madviseWillNeed(m.b)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
|
||||
entry := m.index.Entry(key, timestamp)
|
||||
if entry == nil {
|
||||
|
|
Loading…
Reference in New Issue