fix(tsdb): Fix -compact-series-file flag
parent
a8cf52ba0e
commit
7a9eb1420c
|
@ -145,6 +145,7 @@ func (cmd *Command) compactDatabaseSeriesFile(dbName, path string) error {
|
||||||
for _, path := range paths {
|
for _, path := range paths {
|
||||||
pathCh <- path
|
pathCh <- path
|
||||||
}
|
}
|
||||||
|
close(pathCh)
|
||||||
|
|
||||||
// Concurrently process each partition in the series file
|
// Concurrently process each partition in the series file
|
||||||
var g errgroup.Group
|
var g errgroup.Group
|
||||||
|
@ -182,40 +183,37 @@ func (cmd *Command) compactSeriesFilePartition(path string) error {
|
||||||
const tmpExt = ".tmp"
|
const tmpExt = ".tmp"
|
||||||
|
|
||||||
fmt.Fprintf(cmd.Stdout, "processing partition for %q\n", path)
|
fmt.Fprintf(cmd.Stdout, "processing partition for %q\n", path)
|
||||||
fis, err := ioutil.ReadDir(path)
|
|
||||||
|
// Open partition so index can recover from entries not in the snapshot.
|
||||||
|
partitionID, err := strconv.Atoi(filepath.Base(path))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("cannot parse partition id from path: %s", path)
|
||||||
}
|
}
|
||||||
|
p := tsdb.NewSeriesPartition(partitionID, path, nil)
|
||||||
indexPath := filepath.Join(path, "index")
|
if err := p.Open(); err != nil {
|
||||||
index := tsdb.NewSeriesIndex(indexPath)
|
return fmt.Errorf("cannot open partition: path=%s err=%s", path, err)
|
||||||
if err := index.Open(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
defer index.Close()
|
defer p.Close()
|
||||||
|
|
||||||
for _, fi := range fis {
|
// Loop over segments and compact.
|
||||||
segmentID, err := tsdb.ParseSeriesSegmentFilename(fi.Name())
|
indexPath := p.IndexPath()
|
||||||
if err != nil {
|
var segmentPaths []string
|
||||||
continue // skip non-segment file.
|
for _, segment := range p.Segments() {
|
||||||
}
|
fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", segment.Path(), segment.ID())
|
||||||
|
|
||||||
segmentPath := filepath.Join(path, fi.Name())
|
if err := segment.CompactToPath(segment.Path()+tmpExt, p.Index()); err != nil {
|
||||||
fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", path, segmentID)
|
|
||||||
|
|
||||||
segment := tsdb.NewSeriesSegment(segmentID, path)
|
|
||||||
if err = segment.Open(); err != nil {
|
|
||||||
return err
|
|
||||||
} else if err := segment.CompactToPath(segmentPath+tmpExt, index); err != nil {
|
|
||||||
return err
|
|
||||||
} else if err := segment.Close(); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
segmentPaths = append(segmentPaths, segment.Path())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close partition.
|
||||||
|
if err := p.Close(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the old segment files and replace with new ones.
|
// Remove the old segment files and replace with new ones.
|
||||||
for _, fi := range fis {
|
for _, dst := range segmentPaths {
|
||||||
dst := filepath.Join(path, fi.Name())
|
|
||||||
src := dst + tmpExt
|
src := dst + tmpExt
|
||||||
|
|
||||||
fmt.Fprintf(cmd.Stdout, "renaming new segment %q to %q\n", src, dst)
|
fmt.Fprintf(cmd.Stdout, "renaming new segment %q to %q\n", src, dst)
|
||||||
|
@ -224,13 +222,13 @@ func (cmd *Command) compactSeriesFilePartition(path string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove index file and then rebuild index
|
// Remove index file so it will be rebuilt when reopened.
|
||||||
fmt.Fprintln(cmd.Stdout, "removing index file", indexPath)
|
fmt.Fprintln(cmd.Stdout, "removing index file", indexPath)
|
||||||
if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality
|
if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return index.Close()
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesFilePartitionPaths returns the paths to each partition in the series file.
|
// seriesFilePartitionPaths returns the paths to each partition in the series file.
|
||||||
|
@ -243,7 +241,7 @@ func (cmd *Command) seriesFilePartitionPaths(path string) ([]string, error) {
|
||||||
|
|
||||||
var paths []string
|
var paths []string
|
||||||
for _, partition := range sfile.Partitions() {
|
for _, partition := range sfile.Partitions() {
|
||||||
paths = append(paths, partition.IndexPath())
|
paths = append(paths, partition.Path())
|
||||||
}
|
}
|
||||||
if err := sfile.Close(); err != nil {
|
if err := sfile.Close(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -423,7 +423,7 @@ func (p *SeriesPartition) EnableCompactions() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SeriesPartition) compactionsEnabled() bool {
|
func (p *SeriesPartition) compactionsEnabled() bool {
|
||||||
return p.compactionsDisabled == 0
|
return p.compactionLimiter != nil && p.compactionsDisabled == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendSeriesIDs returns a list of all series ids.
|
// AppendSeriesIDs returns a list of all series ids.
|
||||||
|
|
Loading…
Reference in New Issue