Integrate cache with tsm1dev write path

pull/4911/head
Philip O'Toole 2015-11-25 12:51:56 -08:00
parent 1bca38bb84
commit 8649ce4c49
2 changed files with 53 additions and 10 deletions

View File

@ -34,6 +34,7 @@ type DevEngine struct {
logger *log.Logger
WAL *WAL
Cache *Cache
Compactor *Compactor
RotateFileSize uint32
@ -55,6 +56,7 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
logger: log.New(os.Stderr, "[tsm1dev] ", log.LstdFlags),
WAL: w,
Cache: NewCache(uint64(opt.Config.WALMaxMemorySizeThreshold)),
Compactor: c,
RotateFileSize: DefaultRotateFileSize,
MaxFileSize: MaxDataFileSize,
@ -120,8 +122,13 @@ func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave m
}
}
_, err := e.WAL.WritePoints(values)
return err
id, err := e.WAL.WritePoints(values)
if err != nil {
return err
}
// Write data to cache for query purposes.
return e.Cache.WriteMulti(values, uint64(id))
}
// DeleteSeries deletes the series from the engine.
@ -168,17 +175,18 @@ func (e *DevEngine) compact() {
compact := segments[:n]
start := time.Now()
files, err := e.Compactor.Compact(compact)
files, err := e.Compactor.Compact(compact.Names())
if err != nil {
e.logger.Printf("error compacting WAL segments: %v", err)
}
// TODO: this is stubbed out but would be the place to replace files in the
// file store with the new compacted versions.
e.replaceFiles(files, compact)
e.replaceFiles(files, compact.Names())
// TODO: if replacement succeeds, we'd update the cache with the latest checkpoint.
// e.Cache.SetCheckpoint(...)
// Inform cache data may be evicted.
ids := compact.IDs()
e.Cache.SetCheckpoint(uint64(ids[len(ids)-1]))
e.logger.Printf("compacted %d segments into %d files in %s", len(compact), len(files), time.Since(start))
}

View File

@ -33,6 +33,36 @@ const (
stringEntryType = 4
)
// SegmentInfo represents metadata about a segment.
type SegmentInfo struct {
name string
id int
}
type SegmentInfos []SegmentInfo
func (sis SegmentInfos) Names() []string {
var names []string
for _, s := range sis {
names = append(names, s.name)
}
sort.Strings(names)
return names
}
func (sis SegmentInfos) IDs() []int {
var ids []int
for _, s := range sis {
id, err := idFromFileName(s.name)
if err != nil {
continue
}
ids = append(ids, id)
}
sort.Ints(ids)
return ids
}
// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains
type walEntryType byte
@ -119,7 +149,7 @@ func (l *WAL) WritePoints(values map[string][]Value) (int, error) {
return id, nil
}
func (l *WAL) ClosedSegments() ([]string, error) {
func (l *WAL) ClosedSegments() (SegmentInfos, error) {
l.mu.RLock()
var activePath string
if l.currentSegmentWriter != nil {
@ -138,16 +168,21 @@ func (l *WAL) ClosedSegments() ([]string, error) {
return nil, err
}
var names []string
var sis SegmentInfos
for _, fn := range files {
// Skip the active segment
if fn == activePath {
continue
}
names = append(names, fn)
id, err := idFromFileName(fn)
if err != nil {
return nil, err
}
si := SegmentInfo{name: fn, id: id}
sis = append(sis, si)
}
return names, nil
return sis, nil
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {