diff --git a/tsdb/engine.go b/tsdb/engine.go index b4fe3cfa8a..73cf94ff9f 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -49,6 +49,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic + LastModified() time.Time io.WriterTo } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 9f40733338..e0fc32f809 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -827,6 +827,18 @@ func (e *Engine) SeriesCount() (n int, err error) { return e.index.SeriesN(), nil } +// LastModified returns the time when this shard was last modified +func (e *Engine) LastModified() time.Time { + walTime := e.WAL.LastWriteTime() + fsTime := e.FileStore.LastModified() + + if walTime.After(fsTime) { + return walTime + } + + return fsTime +} + func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } // WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index ed688a7fb8..e26355fb40 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -609,6 +609,63 @@ func TestEngine_DeleteSeries(t *testing.T) { } +func TestEngine_LastModified(t *testing.T) { + // Generate temporary file. + dir, _ := ioutil.TempDir("", "tsm") + walPath := filepath.Join(dir, "wal") + os.MkdirAll(walPath, 0777) + defer os.RemoveAll(dir) + + // Create a few points. + p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") + p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") + p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") + + // Write those points to the engine. + e := tsm1.NewEngine(1, dir, walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) + + // mock the planner so compactions don't run during the test + e.CompactionPlan = &mockPlanner{} + + if lm := e.LastModified(); !lm.IsZero() { + t.Fatalf("expected zero time, got %v", lm.UTC()) + } + + e.SetEnabled(false) + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1 engine: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + lm := e.LastModified() + if lm.IsZero() { + t.Fatalf("expected non-zero time, got %v", lm.UTC()) + } + e.SetEnabled(true) + + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + lm2 := e.LastModified() + + if got, exp := lm.Equal(lm2), false; exp != got { + t.Fatalf("expected time change, got %v, exp %v", got, exp) + } + + if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil { + t.Fatalf("failed to delete series: %v", err) + } + + lm3 := e.LastModified() + if got, exp := lm2.Equal(lm3), false; exp != got { + t.Fatalf("expected time change, got %v, exp %v", got, exp) + } +} + func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) { benchmarkEngineCreateIteratorCount(b, 1000) } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index fdac2daeaf..d34ba06da0 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -171,7 +171,7 @@ func NewFileStore(dir string) *FileStore { logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags) fs := &FileStore{ dir: dir, - lastModified: time.Now(), + lastModified: time.Time{}, logger: logger, traceLogger: log.New(ioutil.Discard, "[filestore] ", log.LstdFlags), logOutput: os.Stderr, @@ -347,7 +347,7 @@ func (f *FileStore) Delete(keys []string) error { // DeleteRange removes the values for keys between min and max. func (f *FileStore) DeleteRange(keys []string, min, max int64) error { f.mu.Lock() - f.lastModified = time.Now() + f.lastModified = time.Now().UTC() f.mu.Unlock() return f.walkFiles(func(tsm TSMFile) error { @@ -411,8 +411,12 @@ func (f *FileStore) Open() error { } // Accumulate file store size stat - if fi, err := file.Stat(); err == nil { + fi, err := file.Stat() + if err == nil { atomic.AddInt64(&f.stats.DiskBytes, fi.Size()) + if fi.ModTime().UTC().After(f.lastModified) { + f.lastModified = fi.ModTime().UTC() + } } go func(idx int, file *os.File) { @@ -515,10 +519,14 @@ func (f *FileStore) Stats() []FileStat { } func (f *FileStore) Replace(oldFiles, newFiles []string) error { + if len(oldFiles) == 0 && len(newFiles) == 0 { + return nil + } + f.mu.Lock() defer f.mu.Unlock() - f.lastModified = time.Now() + maxTime := f.lastModified // Copy the current set of active files while we rename // and load the new files. We copy the pointers here to minimize @@ -545,6 +553,13 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { return err } + // Keep track of the new mod time + if stat, err := fd.Stat(); err == nil { + if stat.ModTime().UTC().After(maxTime) { + maxTime = stat.ModTime().UTC() + } + } + tsm, err := NewTSMReader(fd) if err != nil { return err @@ -618,6 +633,15 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { // Tell the purger about our in-use files we need to remove f.purger.add(inuse) + // If times didn't change (which can happen since file mod times are second level), + // then add a ns to the time to ensure that lastModified changes since files on disk + // actually did change + if maxTime.Equal(f.lastModified) { + maxTime = maxTime.UTC().Add(1) + } + + f.lastModified = maxTime.UTC() + f.lastFileStats = nil f.files = active sort.Sort(tsmReaders(f.files)) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 0f8aa0bc7e..6f56fc6942 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -212,13 +212,14 @@ func (l *WAL) Open() error { } totalOldDiskSize += stat.Size() + if stat.ModTime().After(l.lastWriteTime) { + l.lastWriteTime = stat.ModTime().UTC() + } } atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize) l.closing = make(chan struct{}) - l.lastWriteTime = time.Now() - return nil } @@ -462,6 +463,10 @@ func (l *WAL) newSegmentFile() error { } l.currentSegmentWriter = NewWALSegmentWriter(fd) + if stat, err := fd.Stat(); err == nil { + l.lastWriteTime = stat.ModTime() + } + // Reset the current segment size stat atomic.StoreInt64(&l.stats.CurrentBytes, 0) diff --git a/tsdb/shard.go b/tsdb/shard.go index f0b5a09e89..39b0f7031b 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -335,6 +335,14 @@ func (s *Shard) ready() error { return err } +// LastModified returns the time when this shard was last modified +func (s *Shard) LastModified() time.Time { + if err := s.ready(); err != nil { + return time.Time{} + } + return s.engine.LastModified() +} + // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { var size int64