Update index when import files

pull/8337/head
Jason Wilder 2017-04-28 13:17:21 -06:00
parent a76146e34a
commit 141f0d71cd
1 changed files with 54 additions and 9 deletions
tsdb/engine/tsm1

View File

@ -628,7 +628,7 @@ func (e *Engine) Import(r io.Reader, basePath string) error {
// existing file with the same name in the backup exists. // existing file with the same name in the backup exists.
func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
// Copy files from archive while under lock to prevent reopening. // Copy files from archive while under lock to prevent reopening.
if err := func() error { newFiles, err := func() ([]string, error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
@ -638,21 +638,70 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
if fileName, err := e.readFileFromBackup(tr, basePath, asNew); err == io.EOF { if fileName, err := e.readFileFromBackup(tr, basePath, asNew); err == io.EOF {
break break
} else if err != nil { } else if err != nil {
return err return nil, err
} else if fileName != "" { } else if fileName != "" {
newFiles = append(newFiles, fileName) newFiles = append(newFiles, fileName)
} }
} }
if err := syncDir(e.path); err != nil { if err := syncDir(e.path); err != nil {
return err return nil, err
} }
return e.FileStore.Replace(nil, newFiles) if err := e.FileStore.Replace(nil, newFiles); err != nil {
}(); err != nil { return nil, err
}
return newFiles, nil
}()
if err != nil {
return err return err
} }
// Load any new series keys to the index
readers := make([]chan seriesKey, 0, len(newFiles))
for _, f := range newFiles {
ch := make(chan seriesKey, 1)
readers = append(readers, ch)
// If asNew is true, the files created from readFileFromBackup will be new ones
// having a temp extension.
f = strings.TrimSuffix(f, ".tmp")
fd, err := os.Open(f)
if err != nil {
return err
}
r, err := NewTSMReader(fd)
if err != nil {
return err
}
defer r.Close()
go func(c chan seriesKey, r *TSMReader) {
n := r.KeyCount()
for i := 0; i < n; i++ {
key, typ := r.KeyAt(i)
c <- seriesKey{key, typ}
}
close(c)
}(ch, r)
}
// Merge and dedup all the series keys across each reader to reduce
// lock contention on the index.
merged := merge(readers...)
for v := range merged {
fieldType, err := tsmFieldTypeToInfluxQLDataType(v.typ)
if err != nil {
return err
}
if err := e.addToIndexFromKey(v.key, fieldType, e.index); err != nil {
return err
}
}
return nil return nil
} }
@ -702,10 +751,6 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
return "", err return "", err
} }
if err := f.Close(); err != nil {
return "", err
}
return tmp, nil return tmp, nil
} }