Merge pull request #7651 from influxdata/jw-shard-last-modified

Expose Shard.LastModified
pull/6697/merge
Jason Wilder 2016-11-23 10:19:26 -07:00 committed by GitHub
commit 27d157763a
6 changed files with 113 additions and 6 deletions

View File

@ -49,6 +49,7 @@ type Engine interface {
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
io.WriterTo
}

View File

@ -827,6 +827,18 @@ func (e *Engine) SeriesCount() (n int, err error) {
return e.index.SeriesN(), nil
}
// LastModified returns the time when this shard was last modified
func (e *Engine) LastModified() time.Time {
walTime := e.WAL.LastWriteTime()
fsTime := e.FileStore.LastModified()
if walTime.After(fsTime) {
return walTime
}
return fsTime
}
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.

View File

@ -609,6 +609,63 @@ func TestEngine_DeleteSeries(t *testing.T) {
}
func TestEngine_LastModified(t *testing.T) {
// Generate temporary file.
dir, _ := ioutil.TempDir("", "tsm")
walPath := filepath.Join(dir, "wal")
os.MkdirAll(walPath, 0777)
defer os.RemoveAll(dir)
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
// Write those points to the engine.
e := tsm1.NewEngine(1, dir, walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if lm := e.LastModified(); !lm.IsZero() {
t.Fatalf("expected zero time, got %v", lm.UTC())
}
e.SetEnabled(false)
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
lm := e.LastModified()
if lm.IsZero() {
t.Fatalf("expected non-zero time, got %v", lm.UTC())
}
e.SetEnabled(true)
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
lm2 := e.LastModified()
if got, exp := lm.Equal(lm2), false; exp != got {
t.Fatalf("expected time change, got %v, exp %v", got, exp)
}
if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
lm3 := e.LastModified()
if got, exp := lm2.Equal(lm3), false; exp != got {
t.Fatalf("expected time change, got %v, exp %v", got, exp)
}
}
func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 1000)
}

View File

@ -171,7 +171,7 @@ func NewFileStore(dir string) *FileStore {
logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags)
fs := &FileStore{
dir: dir,
lastModified: time.Now(),
lastModified: time.Time{},
logger: logger,
traceLogger: log.New(ioutil.Discard, "[filestore] ", log.LstdFlags),
logOutput: os.Stderr,
@ -347,7 +347,7 @@ func (f *FileStore) Delete(keys []string) error {
// DeleteRange removes the values for keys between min and max.
func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
f.mu.Lock()
f.lastModified = time.Now()
f.lastModified = time.Now().UTC()
f.mu.Unlock()
return f.walkFiles(func(tsm TSMFile) error {
@ -411,8 +411,12 @@ func (f *FileStore) Open() error {
}
// Accumulate file store size stat
if fi, err := file.Stat(); err == nil {
fi, err := file.Stat()
if err == nil {
atomic.AddInt64(&f.stats.DiskBytes, fi.Size())
if fi.ModTime().UTC().After(f.lastModified) {
f.lastModified = fi.ModTime().UTC()
}
}
go func(idx int, file *os.File) {
@ -515,10 +519,14 @@ func (f *FileStore) Stats() []FileStat {
}
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
if len(oldFiles) == 0 && len(newFiles) == 0 {
return nil
}
f.mu.Lock()
defer f.mu.Unlock()
f.lastModified = time.Now()
maxTime := f.lastModified
// Copy the current set of active files while we rename
// and load the new files. We copy the pointers here to minimize
@ -545,6 +553,13 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
return err
}
// Keep track of the new mod time
if stat, err := fd.Stat(); err == nil {
if stat.ModTime().UTC().After(maxTime) {
maxTime = stat.ModTime().UTC()
}
}
tsm, err := NewTSMReader(fd)
if err != nil {
return err
@ -618,6 +633,15 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
// Tell the purger about our in-use files we need to remove
f.purger.add(inuse)
// If times didn't change (which can happen since file mod times are second level),
// then add a ns to the time to ensure that lastModified changes since files on disk
// actually did change
if maxTime.Equal(f.lastModified) {
maxTime = maxTime.UTC().Add(1)
}
f.lastModified = maxTime.UTC()
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))

View File

@ -212,13 +212,14 @@ func (l *WAL) Open() error {
}
totalOldDiskSize += stat.Size()
if stat.ModTime().After(l.lastWriteTime) {
l.lastWriteTime = stat.ModTime().UTC()
}
}
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)
l.closing = make(chan struct{})
l.lastWriteTime = time.Now()
return nil
}
@ -462,6 +463,10 @@ func (l *WAL) newSegmentFile() error {
}
l.currentSegmentWriter = NewWALSegmentWriter(fd)
if stat, err := fd.Stat(); err == nil {
l.lastWriteTime = stat.ModTime()
}
// Reset the current segment size stat
atomic.StoreInt64(&l.stats.CurrentBytes, 0)

View File

@ -335,6 +335,14 @@ func (s *Shard) ready() error {
return err
}
// LastModified returns the time when this shard was last modified
func (s *Shard) LastModified() time.Time {
if err := s.ready(); err != nil {
return time.Time{}
}
return s.engine.LastModified()
}
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
var size int64