Merge pull request #16762 from influxdata/fix-compact-series-file

fix(tsdb): Fix -compact-series-file flag
pull/16944/head
Ben Johnson 2020-02-10 06:42:20 -07:00 committed by GitHub
commit 6b07ed7200
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 28 deletions

View File

@ -145,6 +145,7 @@ func (cmd *Command) compactDatabaseSeriesFile(dbName, path string) error {
for _, path := range paths {
pathCh <- path
}
close(pathCh)
// Concurrently process each partition in the series file
var g errgroup.Group
@ -182,40 +183,37 @@ func (cmd *Command) compactSeriesFilePartition(path string) error {
const tmpExt = ".tmp"
fmt.Fprintf(cmd.Stdout, "processing partition for %q\n", path)
fis, err := ioutil.ReadDir(path)
// Open partition so index can recover from entries not in the snapshot.
partitionID, err := strconv.Atoi(filepath.Base(path))
if err != nil {
return err
return fmt.Errorf("cannot parse partition id from path: %s", path)
}
indexPath := filepath.Join(path, "index")
index := tsdb.NewSeriesIndex(indexPath)
if err := index.Open(); err != nil {
return err
p := tsdb.NewSeriesPartition(partitionID, path, nil)
if err := p.Open(); err != nil {
return fmt.Errorf("cannot open partition: path=%s err=%s", path, err)
}
defer index.Close()
defer p.Close()
for _, fi := range fis {
segmentID, err := tsdb.ParseSeriesSegmentFilename(fi.Name())
if err != nil {
continue // skip non-segment file.
}
// Loop over segments and compact.
indexPath := p.IndexPath()
var segmentPaths []string
for _, segment := range p.Segments() {
fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", segment.Path(), segment.ID())
segmentPath := filepath.Join(path, fi.Name())
fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", path, segmentID)
segment := tsdb.NewSeriesSegment(segmentID, path)
if err = segment.Open(); err != nil {
return err
} else if err := segment.CompactToPath(segmentPath+tmpExt, index); err != nil {
return err
} else if err := segment.Close(); err != nil {
if err := segment.CompactToPath(segment.Path()+tmpExt, p.Index()); err != nil {
return err
}
segmentPaths = append(segmentPaths, segment.Path())
}
// Close partition.
if err := p.Close(); err != nil {
return err
}
// Remove the old segment files and replace with new ones.
for _, fi := range fis {
dst := filepath.Join(path, fi.Name())
for _, dst := range segmentPaths {
src := dst + tmpExt
fmt.Fprintf(cmd.Stdout, "renaming new segment %q to %q\n", src, dst)
@ -224,13 +222,13 @@ func (cmd *Command) compactSeriesFilePartition(path string) error {
}
}
// Remove index file and then rebuild index
// Remove index file so it will be rebuilt when reopened.
fmt.Fprintln(cmd.Stdout, "removing index file", indexPath)
if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality
return err
}
return index.Close()
return nil
}
// seriesFilePartitionPaths returns the paths to each partition in the series file.
@ -243,7 +241,7 @@ func (cmd *Command) seriesFilePartitionPaths(path string) ([]string, error) {
var paths []string
for _, partition := range sfile.Partitions() {
paths = append(paths, partition.IndexPath())
paths = append(paths, partition.Path())
}
if err := sfile.Close(); err != nil {
return nil, err

View File

@ -423,7 +423,7 @@ func (p *SeriesPartition) EnableCompactions() {
}
func (p *SeriesPartition) compactionsEnabled() bool {
return p.compactionsDisabled == 0
return p.compactionLimiter != nil && p.compactionsDisabled == 0
}
// AppendSeriesIDs returns a list of all series ids.