Merge pull request #5758 from influxdata/mr-disk-stats

Track cache, WAL, filestore stats within tsm1 engine
pull/5786/head
Mark Rushakoff 2016-02-22 13:01:55 -08:00
commit fc5c8597ab
7 changed files with 210 additions and 25 deletions

View File

@ -16,6 +16,7 @@
- [#5336](https://github.com/influxdata/influxdb/pull/5366): Enabled golint for influxql. @gabelev
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup
- [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped.
- [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour
### Bugfixes

View File

@ -1,11 +1,15 @@
package tsm1
import (
"expvar"
"fmt"
"log"
"os"
"sort"
"sync"
"time"
"github.com/influxdata/influxdb"
)
var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
@ -58,6 +62,21 @@ func (e *entry) deduplicate() {
e.needSort = false
}
// Statistics gathered by the Cache.
const (
// levels - point in time measures
statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time
// counters - accumulative measures
statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
)
// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
mu sync.RWMutex
@ -70,30 +89,47 @@ type Cache struct {
// they are read only and should never be modified
snapshots []*Cache
snapshotsSize uint64
statMap *expvar.Map // nil for snapshots.
lastSnapshot time.Time
}
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
func NewCache(maxSize uint64) *Cache {
return &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
// Only used for engine caches, never for snapshots
func NewCache(maxSize uint64, path string) *Cache {
c := &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
lastSnapshot: time.Now(),
}
c.UpdateAge()
c.UpdateCompactTime(0)
c.updateCachedBytes(0)
c.updateMemSize(0)
c.updateSnapshots()
return c
}
// Write writes the set of values for the key to the cache. This function is goroutine-safe.
// It returns an error if the cache has exceeded its max size.
func (c *Cache) Write(key string, values []Value) error {
c.mu.Lock()
defer c.mu.Unlock()
// Enough room in the cache?
newSize := c.size + uint64(Values(values).Size())
addedSize := Values(values).Size()
newSize := c.size + uint64(addedSize)
if c.maxSize > 0 && newSize+c.snapshotsSize > c.maxSize {
c.mu.Unlock()
return ErrCacheMemoryExceeded
}
c.write(key, values)
c.size = newSize
c.mu.Unlock()
// Update the memory size stat
c.updateMemSize(int64(addedSize))
return nil
}
@ -122,6 +158,9 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
c.size = newSize
c.mu.Unlock()
// Update the memory size stat
c.updateMemSize(int64(totalSz))
return nil
}
@ -131,16 +170,22 @@ func (c *Cache) Snapshot() *Cache {
c.mu.Lock()
defer c.mu.Unlock()
snapshot := NewCache(c.maxSize)
snapshot.store = c.store
snapshot.size = c.size
snapshot := &Cache{
store: c.store,
size: c.size,
}
c.store = make(map[string]*entry)
c.size = 0
c.lastSnapshot = time.Now()
c.snapshots = append(c.snapshots, snapshot)
c.snapshotsSize += snapshot.size
c.updateMemSize(-int64(snapshot.size))
c.updateCachedBytes(snapshot.size)
c.updateSnapshots()
return snapshot
}
@ -165,6 +210,8 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) {
break
}
}
c.updateSnapshots()
}
// Size returns the number of point-calcuated bytes the cache currently uses.
@ -381,3 +428,39 @@ func (cl *CacheLoader) Load(cache *Cache) error {
}
return nil
}
// Updates the age statistic
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := new(expvar.Int)
ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond))
c.statMap.Set(statCacheAgeMs, ageStat)
}
// Updates WAL compaction time statistic
func (c *Cache) UpdateCompactTime(d time.Duration) {
c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond))
}
// Update the cachedBytes counter
func (c *Cache) updateCachedBytes(b uint64) {
c.statMap.Add(statCachedBytes, int64(b))
}
// Update the memSize level
func (c *Cache) updateMemSize(b int64) {
c.statMap.Add(statCacheMemoryBytes, b)
}
// Update the snapshotsCount and the diskSize levels
func (c *Cache) updateSnapshots() {
// Update disk stats
diskSizeStat := new(expvar.Int)
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)
snapshotsStat := new(expvar.Int)
snapshotsStat.Set(int64(len(c.snapshots)))
c.statMap.Set(statSnapshots, snapshotsStat)
}

View File

@ -12,7 +12,7 @@ import (
)
func TestCache_NewCache(t *testing.T) {
c := NewCache(100)
c := NewCache(100, "")
if c == nil {
t.Fatalf("failed to create new cache")
}
@ -35,7 +35,7 @@ func TestCache_CacheWrite(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, "")
if err := c.Write("foo", values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -59,7 +59,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3 * valuesSize)
c := NewCache(3*valuesSize, "")
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -115,7 +115,7 @@ func TestCache_CacheValues(t *testing.T) {
v3 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v4 := NewValue(time.Unix(4, 0).UTC(), 4.0)
c := NewCache(512)
c := NewCache(512, "")
if deduped := c.Values("no such key"); deduped != nil {
t.Fatalf("Values returned for no such key")
}
@ -141,7 +141,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)
c := NewCache(512)
c := NewCache(512, "")
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
@ -185,7 +185,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
}
func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512)
c := NewCache(512, "")
// Grab snapshot, and ensure it's as expected.
snapshot := c.Snapshot()
@ -209,7 +209,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
c := NewCache(uint64(v1.Size()))
c := NewCache(uint64(v1.Size()), "")
if err := c.Write("foo", Values{v0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -265,7 +265,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}
// Load the cache using the segment.
cache := NewCache(1024)
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -288,7 +288,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}
// Reload the cache using the segment.
cache = NewCache(1024)
cache = NewCache(1024, "")
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -347,7 +347,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) {
}
// Load the cache using the segments.
cache := NewCache(1024)
cache := NewCache(1024, "")
loader := NewCacheLoader([]string{f1.Name(), f2.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -397,7 +397,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
func BenchmarkCacheFloatEntries(b *testing.B) {
for i := 0; i < b.N; i++ {
cache := NewCache(10000)
cache := NewCache(10000, "")
for j := 0; j < 10000; j++ {
v := NewValue(time.Unix(1, 0), float64(j))
cache.Write("test", []Value{v})

View File

@ -25,7 +25,7 @@ func TestCompactor_Snapshot(t *testing.T) {
"cpu,host=B#!~#value": []tsm1.Value{v2, v3},
}
c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")
for k, v := range points1 {
if err := c.Write(k, v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -497,7 +497,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v0},
}
c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")
for k, v := range writes {
if err := c.Write(k, v); err != nil {
@ -545,7 +545,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v0, v1},
}
c := tsm1.NewCache(0)
c := tsm1.NewCache(0, "")
for k, v := range writes {
if err := c.Write(k, v); err != nil {

View File

@ -70,7 +70,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
fs := NewFileStore(path)
fs.traceLogging = opt.Config.DataLoggingEnabled
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)
c := &Compactor{
Dir: path,
@ -410,10 +410,22 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen
func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot
var started *time.Time
defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Now().Sub(*started))
}
}()
closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
e.mu.Lock()
defer e.mu.Unlock()
now := time.Now()
started = &now
if err := e.WAL.CloseSegment(); err != nil {
return nil, nil, nil, err
}
@ -477,6 +489,7 @@ func (e *Engine) compactCache() {
return
default:
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
err := e.WriteSnapshot()
if err != nil {

View File

@ -1,6 +1,7 @@
package tsm1
import (
"expvar"
"fmt"
"log"
"os"
@ -10,6 +11,8 @@ import (
"strings"
"sync"
"time"
"github.com/influxdata/influxdb"
)
type TSMFile interface {
@ -79,6 +82,11 @@ type TSMFile interface {
BlockIterator() *BlockIterator
}
// Statistics gathered by the FileStore.
const (
statFileStoreBytes = "diskBytes"
)
type FileStore struct {
mu sync.RWMutex
lastModified time.Time
@ -90,6 +98,8 @@ type FileStore struct {
Logger *log.Logger
traceLogging bool
statMap *expvar.Map
}
type FileStat struct {
@ -118,7 +128,8 @@ func NewFileStore(dir string) *FileStore {
return &FileStore{
dir: dir,
lastModified: time.Now(),
Logger: log.New(os.Stderr, "[filestore]", log.LstdFlags),
Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags),
statMap: influxdb.NewStatistics("tsm1_filestore:"+dir, "tsm1_filestore", map[string]string{"path": dir}),
}
}
@ -154,6 +165,9 @@ func (f *FileStore) NextGeneration() int {
func (f *FileStore) Add(files ...TSMFile) {
f.mu.Lock()
defer f.mu.Unlock()
for _, file := range files {
f.statMap.Add(statFileStoreBytes, int64(file.Size()))
}
f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
}
@ -175,6 +189,9 @@ func (f *FileStore) Remove(paths ...string) {
if keep {
active = append(active, file)
} else {
// Removing the file, remove the file size from the total file store bytes
f.statMap.Add(statFileStoreBytes, -int64(file.Size()))
}
}
f.files = active
@ -263,6 +280,11 @@ func (f *FileStore) Open() error {
return fmt.Errorf("error opening file %s: %v", fn, err)
}
// Accumulate file store size stat
if fi, err := file.Stat(); err == nil {
f.statMap.Add(statFileStoreBytes, fi.Size())
}
go func(idx int, file *os.File) {
start := time.Now()
df, err := NewTSMReaderWithOptions(TSMReaderOptions{
@ -412,6 +434,15 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
f.files = active
sort.Sort(tsmReaders(f.files))
// Recalculate the disk size stat
var totalSize int64
for _, file := range f.files {
totalSize += int64(file.Size())
}
sizeStat := new(expvar.Int)
sizeStat.Set(totalSize)
f.statMap.Set(statFileStoreBytes, sizeStat)
return nil
}

View File

@ -3,6 +3,7 @@ package tsm1
import (
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"log"
@ -16,6 +17,7 @@ import (
"time"
"github.com/golang/snappy"
"github.com/influxdata/influxdb"
)
const (
@ -54,6 +56,12 @@ const (
var ErrWALClosed = fmt.Errorf("WAL closed")
// Statistics gathered by the WAL.
const (
statWALOldBytes = "oldSegmentsDiskBytes"
statWALCurrentBytes = "currentSegmentDiskBytes"
)
type WAL struct {
mu sync.RWMutex
lastWriteTime time.Time
@ -76,6 +84,8 @@ type WAL struct {
// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool
statMap *expvar.Map
}
func NewWAL(path string) *WAL {
@ -87,6 +97,8 @@ func NewWAL(path string) *WAL {
SegmentSize: DefaultSegmentSize,
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),
statMap: influxdb.NewStatistics("tsm1_wal:"+path, "tsm1_wal", map[string]string{"path": path}),
}
}
@ -130,12 +142,26 @@ func (l *WAL) Open() error {
if stat.Size() == 0 {
os.Remove(lastSegment)
segments = segments[:len(segments)-1]
}
if err := l.newSegmentFile(); err != nil {
return err
}
}
var totalOldDiskSize int64
for _, seg := range segments {
stat, err := os.Stat(seg)
if err != nil {
return err
}
totalOldDiskSize += stat.Size()
}
sizeStat := new(expvar.Int)
sizeStat.Set(totalOldDiskSize)
l.statMap.Set(statWALOldBytes, sizeStat)
l.closing = make(chan struct{})
l.lastWriteTime = time.Now()
@ -196,6 +222,26 @@ func (l *WAL) Remove(files []string) error {
for _, fn := range files {
os.RemoveAll(fn)
}
// Refresh the on-disk size stats
segments, err := segmentFileNames(l.path)
if err != nil {
return err
}
var totalOldDiskSize int64
for _, seg := range segments {
stat, err := os.Stat(seg)
if err != nil {
return err
}
totalOldDiskSize += stat.Size()
}
sizeStat := new(expvar.Int)
sizeStat.Set(totalOldDiskSize)
l.statMap.Set(statWALOldBytes, sizeStat)
return nil
}
@ -240,6 +286,11 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
// Update stats for current segment size
curSize := new(expvar.Int)
curSize.Set(int64(l.currentSegmentWriter.size))
l.statMap.Set(statWALCurrentBytes, curSize)
l.lastWriteTime = time.Now()
return l.currentSegmentID, l.currentSegmentWriter.sync()
@ -324,6 +375,7 @@ func (l *WAL) newSegmentFile() error {
if err := l.currentSegmentWriter.close(); err != nil {
return err
}
l.statMap.Add(statWALOldBytes, int64(l.currentSegmentWriter.size))
}
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
@ -333,6 +385,11 @@ func (l *WAL) newSegmentFile() error {
}
l.currentSegmentWriter = NewWALSegmentWriter(fd)
// Reset the current segment size stat
curSize := new(expvar.Int)
curSize.Set(0)
l.statMap.Set(statWALCurrentBytes, curSize)
return nil
}