From 7a9eb1420c44386190454a6e9c2b39b15ddf2acb Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 5 Feb 2020 12:43:04 -0700 Subject: [PATCH] fix(tsdb): Fix -compact-series-file flag --- cmd/influx_inspect/buildtsi/buildtsi.go | 52 ++++++++++++------------- tsdb/series_partition.go | 2 +- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/cmd/influx_inspect/buildtsi/buildtsi.go b/cmd/influx_inspect/buildtsi/buildtsi.go index b41a7091e0..aa59463260 100644 --- a/cmd/influx_inspect/buildtsi/buildtsi.go +++ b/cmd/influx_inspect/buildtsi/buildtsi.go @@ -145,6 +145,7 @@ func (cmd *Command) compactDatabaseSeriesFile(dbName, path string) error { for _, path := range paths { pathCh <- path } + close(pathCh) // Concurrently process each partition in the series file var g errgroup.Group @@ -182,40 +183,37 @@ func (cmd *Command) compactSeriesFilePartition(path string) error { const tmpExt = ".tmp" fmt.Fprintf(cmd.Stdout, "processing partition for %q\n", path) - fis, err := ioutil.ReadDir(path) + + // Open partition so index can recover from entries not in the snapshot. + partitionID, err := strconv.Atoi(filepath.Base(path)) if err != nil { - return err + return fmt.Errorf("cannot parse partition id from path: %s", path) } - - indexPath := filepath.Join(path, "index") - index := tsdb.NewSeriesIndex(indexPath) - if err := index.Open(); err != nil { - return err + p := tsdb.NewSeriesPartition(partitionID, path, nil) + if err := p.Open(); err != nil { + return fmt.Errorf("cannot open partition: path=%s err=%s", path, err) } - defer index.Close() + defer p.Close() - for _, fi := range fis { - segmentID, err := tsdb.ParseSeriesSegmentFilename(fi.Name()) - if err != nil { - continue // skip non-segment file. - } + // Loop over segments and compact. + indexPath := p.IndexPath() + var segmentPaths []string + for _, segment := range p.Segments() { + fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", segment.Path(), segment.ID()) - segmentPath := filepath.Join(path, fi.Name()) - fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", path, segmentID) - - segment := tsdb.NewSeriesSegment(segmentID, path) - if err = segment.Open(); err != nil { - return err - } else if err := segment.CompactToPath(segmentPath+tmpExt, index); err != nil { - return err - } else if err := segment.Close(); err != nil { + if err := segment.CompactToPath(segment.Path()+tmpExt, p.Index()); err != nil { return err } + segmentPaths = append(segmentPaths, segment.Path()) + } + + // Close partition. + if err := p.Close(); err != nil { + return err } // Remove the old segment files and replace with new ones. - for _, fi := range fis { - dst := filepath.Join(path, fi.Name()) + for _, dst := range segmentPaths { src := dst + tmpExt fmt.Fprintf(cmd.Stdout, "renaming new segment %q to %q\n", src, dst) @@ -224,13 +222,13 @@ func (cmd *Command) compactSeriesFilePartition(path string) error { } } - // Remove index file and then rebuild index + // Remove index file so it will be rebuilt when reopened. fmt.Fprintln(cmd.Stdout, "removing index file", indexPath) if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality return err } - return index.Close() + return nil } // seriesFilePartitionPaths returns the paths to each partition in the series file. @@ -243,7 +241,7 @@ func (cmd *Command) seriesFilePartitionPaths(path string) ([]string, error) { var paths []string for _, partition := range sfile.Partitions() { - paths = append(paths, partition.IndexPath()) + paths = append(paths, partition.Path()) } if err := sfile.Close(); err != nil { return nil, err diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 7890a2e368..b97b44a2fa 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -423,7 +423,7 @@ func (p *SeriesPartition) EnableCompactions() { } func (p *SeriesPartition) compactionsEnabled() bool { - return p.compactionsDisabled == 0 + return p.compactionLimiter != nil && p.compactionsDisabled == 0 } // AppendSeriesIDs returns a list of all series ids.