From 141f0d71cd5a60c6f0186570f54f09a6d17ad0f4 Mon Sep 17 00:00:00 2001 From: Jason Wilder <mail@jasonwilder.com> Date: Fri, 28 Apr 2017 13:17:21 -0600 Subject: [PATCH] Update index when import files --- tsdb/engine/tsm1/engine.go | 63 ++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 484243713c..125a27e856 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -628,7 +628,7 @@ func (e *Engine) Import(r io.Reader, basePath string) error { // existing file with the same name in the backup exists. func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { // Copy files from archive while under lock to prevent reopening. - if err := func() error { + newFiles, err := func() ([]string, error) { e.mu.Lock() defer e.mu.Unlock() @@ -638,21 +638,70 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { if fileName, err := e.readFileFromBackup(tr, basePath, asNew); err == io.EOF { break } else if err != nil { - return err + return nil, err } else if fileName != "" { newFiles = append(newFiles, fileName) } } if err := syncDir(e.path); err != nil { - return err + return nil, err } - return e.FileStore.Replace(nil, newFiles) - }(); err != nil { + if err := e.FileStore.Replace(nil, newFiles); err != nil { + return nil, err + } + return newFiles, nil + }() + + if err != nil { return err } + // Load any new series keys to the index + readers := make([]chan seriesKey, 0, len(newFiles)) + for _, f := range newFiles { + ch := make(chan seriesKey, 1) + readers = append(readers, ch) + + // If asNew is true, the files created from readFileFromBackup will be new ones + // having a temp extension. + f = strings.TrimSuffix(f, ".tmp") + + fd, err := os.Open(f) + if err != nil { + return err + } + + r, err := NewTSMReader(fd) + if err != nil { + return err + } + defer r.Close() + + go func(c chan seriesKey, r *TSMReader) { + n := r.KeyCount() + for i := 0; i < n; i++ { + key, typ := r.KeyAt(i) + c <- seriesKey{key, typ} + } + close(c) + }(ch, r) + } + + // Merge and dedup all the series keys across each reader to reduce + // lock contention on the index. + merged := merge(readers...) + for v := range merged { + fieldType, err := tsmFieldTypeToInfluxQLDataType(v.typ) + if err != nil { + return err + } + + if err := e.addToIndexFromKey(v.key, fieldType, e.index); err != nil { + return err + } + } return nil } @@ -702,10 +751,6 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as return "", err } - if err := f.Close(); err != nil { - return "", err - } - return tmp, nil }