Merge pull request #7480 from influxdata/er-fs-stats

Memoize output of FileStore.Stats
pull/7455/merge
Edd Robinson 2016-10-24 18:26:02 +01:00 committed by GitHub
commit 3a1b811037
3 changed files with 121 additions and 6 deletions

View File

@ -31,6 +31,7 @@
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.
- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors.
- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series.
- [#7480](https://github.com/influxdata/influxdb/pull/7480): Improve compaction planning performance by caching tsm file stats.
### Bugfixes

View File

@ -124,6 +124,9 @@ const (
type FileStore struct {
mu sync.RWMutex
lastModified time.Time
// Most recently known file stats. If nil then stats will need to be
// recalculated
lastFileStats []FileStat
currentGeneration int
dir string
@ -258,6 +261,7 @@ func (f *FileStore) Add(files ...TSMFile) {
for _, file := range files {
atomic.AddInt64(&f.stats.DiskBytes, int64(file.Size()))
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
@ -285,6 +289,7 @@ func (f *FileStore) Remove(paths ...string) {
atomic.AddInt64(&f.stats.DiskBytes, -int64(file.Size()))
}
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
@ -449,6 +454,7 @@ func (f *FileStore) Close() error {
file.Close()
}
f.lastFileStats = nil
f.files = nil
atomic.StoreInt64(&f.stats.FileCount, 0)
return nil
@ -485,13 +491,27 @@ func (f *FileStore) KeyCursor(key string, t int64, ascending bool) *KeyCursor {
func (f *FileStore) Stats() []FileStat {
f.mu.RLock()
defer f.mu.RUnlock()
stats := make([]FileStat, len(f.files))
for i, fd := range f.files {
stats[i] = fd.Stats()
if len(f.lastFileStats) > 0 {
defer f.mu.RUnlock()
return f.lastFileStats
}
f.mu.RUnlock()
// The file stats cache is invalid due to changes to files. Need to
// recalculate.
f.mu.Lock()
// If lastFileStats's capacity is far away from the number of entries
// we need to add, then we'll reallocate.
if cap(f.lastFileStats) < len(f.files)/2 {
f.lastFileStats = make([]FileStat, 0, len(f.files))
}
return stats
for _, fd := range f.files {
f.lastFileStats = append(f.lastFileStats, fd.Stats())
}
defer f.mu.Unlock()
return f.lastFileStats
}
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
@ -598,6 +618,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
// Tell the purger about our in-use files we need to remove
f.purger.add(inuse)
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))

View File

@ -5,6 +5,8 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
@ -2188,7 +2190,7 @@ func TestFileStore_Stats(t *testing.T) {
keyValues{"mem", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
}
_, err := newFileDir(dir, data...)
files, err := newFileDir(dir, data...)
if err != nil {
fatal(t, "creating test files", err)
}
@ -2203,6 +2205,63 @@ func TestFileStore_Stats(t *testing.T) {
if got, exp := len(stats), 3; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
// Another call should result in the same stats being returned.
if got, exp := fs.Stats(), stats; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, exp %v", got, exp)
}
// Removing one of the files should invalidate the cache.
fs.Remove(files[0])
if got, exp := len(fs.Stats()), 2; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
// Write a new TSM file that that is not open
newFile := MustWriteTSM(dir, 4, map[string][]tsm1.Value{
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)},
})
replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension
if err := os.Rename(newFile, replacement); err != nil {
}
// Replace 3 w/ 1
if err := fs.Replace(files, []string{replacement}); err != nil {
t.Fatalf("replace: %v", err)
}
var found bool
stats = fs.Stats()
for _, stat := range stats {
if strings.HasSuffix(stat.Path, "-foo") {
found = true
}
}
if !found {
t.Fatalf("Didn't find %s in stats: %v", "foo", stats)
}
newFile = MustWriteTSM(dir, 5, map[string][]tsm1.Value{
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)},
})
fd, err := os.Open(newFile)
if err != nil {
t.Fatalf("open file: %v", err)
}
f, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("new reader: %v", err)
}
// Adding some files should invalidate the cache.
fs.Add(f)
if got, exp := len(fs.Stats()), 2; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
}
func TestFileStore_CreateSnapshot(t *testing.T) {
@ -2363,3 +2422,37 @@ func fatal(t *testing.T, msg string, err error) {
func tsmFileName(id int) string {
return fmt.Sprintf("%09d-%09d.tsm", id, 1)
}
var fsResult []tsm1.FileStat
func BenchmarkFileStore_Stats(b *testing.B) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// Create some TSM files...
data := make([]keyValues, 0, 1000)
for i := 0; i < 1000; i++ {
data = append(data, keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}})
}
_, err := newFileDir(dir, data...)
if err != nil {
b.Fatalf("creating benchmark files %v", err)
}
fs := tsm1.NewFileStore(dir)
if !testing.Verbose() {
fs.SetLogOutput(ioutil.Discard)
}
if err := fs.Open(); err != nil {
b.Fatalf("opening file store %v", err)
}
defer fs.Close()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fsResult = fs.Stats()
}
}