diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index dc86206b8d..9257cfb8ef 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -34,6 +34,7 @@ type DevEngine struct { logger *log.Logger WAL *WAL + Cache *Cache Compactor *Compactor RotateFileSize uint32 @@ -55,6 +56,7 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi logger: log.New(os.Stderr, "[tsm1dev] ", log.LstdFlags), WAL: w, + Cache: NewCache(uint64(opt.Config.WALMaxMemorySizeThreshold)), Compactor: c, RotateFileSize: DefaultRotateFileSize, MaxFileSize: MaxDataFileSize, @@ -120,8 +122,13 @@ func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave m } } - _, err := e.WAL.WritePoints(values) - return err + id, err := e.WAL.WritePoints(values) + if err != nil { + return err + } + + // Write data to cache for query purposes. + return e.Cache.WriteMulti(values, uint64(id)) } // DeleteSeries deletes the series from the engine. @@ -168,17 +175,18 @@ func (e *DevEngine) compact() { compact := segments[:n] start := time.Now() - files, err := e.Compactor.Compact(compact) + files, err := e.Compactor.Compact(compact.Names()) if err != nil { e.logger.Printf("error compacting WAL segments: %v", err) } // TODO: this is stubbed out but would be the place to replace files in the // file store with the new compacted versions. - e.replaceFiles(files, compact) + e.replaceFiles(files, compact.Names()) - // TODO: if replacement succeeds, we'd update the cache with the latest checkpoint. - // e.Cache.SetCheckpoint(...) + // Inform cache data may be evicted. + ids := compact.IDs() + e.Cache.SetCheckpoint(uint64(ids[len(ids)-1])) e.logger.Printf("compacted %d segments into %d files in %s", len(compact), len(files), time.Since(start)) } diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index b2e2aba57d..9e2c9b15fc 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -33,6 +33,36 @@ const ( stringEntryType = 4 ) +// SegmentInfo represents metadata about a segment. +type SegmentInfo struct { + name string + id int +} + +type SegmentInfos []SegmentInfo + +func (sis SegmentInfos) Names() []string { + var names []string + for _, s := range sis { + names = append(names, s.name) + } + sort.Strings(names) + return names +} + +func (sis SegmentInfos) IDs() []int { + var ids []int + for _, s := range sis { + id, err := idFromFileName(s.name) + if err != nil { + continue + } + ids = append(ids, id) + } + sort.Ints(ids) + return ids +} + // walEntry is a byte written to a wal segment file that indicates what the following compressed block contains type walEntryType byte @@ -119,7 +149,7 @@ func (l *WAL) WritePoints(values map[string][]Value) (int, error) { return id, nil } -func (l *WAL) ClosedSegments() ([]string, error) { +func (l *WAL) ClosedSegments() (SegmentInfos, error) { l.mu.RLock() var activePath string if l.currentSegmentWriter != nil { @@ -138,16 +168,21 @@ func (l *WAL) ClosedSegments() ([]string, error) { return nil, err } - var names []string + var sis SegmentInfos for _, fn := range files { // Skip the active segment if fn == activePath { continue } - names = append(names, fn) + id, err := idFromFileName(fn) + if err != nil { + return nil, err + } + si := SegmentInfo{name: fn, id: id} + sis = append(sis, si) } - return names, nil + return sis, nil } func (l *WAL) writeToLog(entry WALEntry) (int, error) {