Merge pull request #17185 from influxdata/series-compaction

feat(storage): Series File Compaction
pull/17218/head
Ben Johnson 2020-03-12 08:01:49 -06:00 committed by GitHub
commit 889c58c17f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 409 additions and 1 deletions

View File

@ -0,0 +1,192 @@
package inspect
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/tsdb"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
var compactSeriesFileFlags = struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer
// Data path options
SeriesFilePath string // optional. Defaults to <engine_path>/engine/_series
IndexPath string // optional. Defaults to <engine_path>/engine/index
Concurrency int // optional. Defaults to GOMAXPROCS(0)
}{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
// NewCompactSeriesFileCommand returns a new instance of Command with default setting applied.
func NewCompactSeriesFileCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "compact-series-file",
Short: "Compacts the series file to removed deleted series.",
Long: `This command will compact the series file by removing deleted series.`,
RunE: RunCompactSeriesFile,
}
home, _ := fs.InfluxDir()
defaultPath := filepath.Join(home, "engine")
defaultSFilePath := filepath.Join(defaultPath, storage.DefaultSeriesFileDirectoryName)
defaultIndexPath := filepath.Join(defaultPath, storage.DefaultIndexDirectoryName)
cmd.Flags().StringVar(&compactSeriesFileFlags.SeriesFilePath, "sfile-path", defaultSFilePath, "Path to the Series File directory. Defaults to "+defaultSFilePath)
cmd.Flags().StringVar(&compactSeriesFileFlags.IndexPath, "tsi-path", defaultIndexPath, "Path to the TSI index directory. Defaults to "+defaultIndexPath)
cmd.Flags().IntVar(&compactSeriesFileFlags.Concurrency, "concurrency", runtime.GOMAXPROCS(0), "Number of workers to dedicate to compaction. Defaults to GOMAXPROCS. Max 8.")
cmd.SetOutput(compactSeriesFileFlags.Stdout)
return cmd
}
// RunCompactSeriesFile executes the run command for CompactSeriesFile.
func RunCompactSeriesFile(cmd *cobra.Command, args []string) error {
// Verify the user actually wants to run as root.
if isRoot() {
fmt.Fprintln(compactSeriesFileFlags.Stdout, "You are currently running as root. This will compact your")
fmt.Fprintln(compactSeriesFileFlags.Stdout, "series file with root ownership and will be inaccessible")
fmt.Fprintln(compactSeriesFileFlags.Stdout, "if you run influxd as a non-root user. You should run")
fmt.Fprintln(compactSeriesFileFlags.Stdout, "influxd inspect compact-series-file as the same user you are running influxd.")
fmt.Fprint(compactSeriesFileFlags.Stdout, "Are you sure you want to continue? (y/N): ")
var answer string
if fmt.Scanln(&answer); !strings.HasPrefix(strings.TrimSpace(strings.ToLower(answer)), "y") {
return fmt.Errorf("operation aborted")
}
}
paths, err := seriesFilePartitionPaths(compactSeriesFileFlags.SeriesFilePath)
if err != nil {
return err
}
// Build input channel.
pathCh := make(chan string, len(paths))
for _, path := range paths {
pathCh <- path
}
close(pathCh)
// Limit maximum concurrency to the total number of series file partitions.
concurrency := compactSeriesFileFlags.Concurrency
if concurrency > tsdb.SeriesFilePartitionN {
concurrency = tsdb.SeriesFilePartitionN
}
// Concurrently process each partition in the series file
var g errgroup.Group
for i := 0; i < concurrency; i++ {
g.Go(func() error {
for path := range pathCh {
if err := compactSeriesFilePartition(path); err != nil {
return err
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
// Build new series file indexes
sfile := tsdb.NewSeriesFile(compactSeriesFileFlags.SeriesFilePath)
if err = sfile.Open(context.Background()); err != nil {
return err
}
compactor := tsdb.NewSeriesPartitionCompactor()
for _, partition := range sfile.Partitions() {
duration, err := compactor.Compact(partition)
if err != nil {
return err
}
fmt.Fprintf(compactSeriesFileFlags.Stdout, "compacted %s in %s\n", partition.Path(), duration)
}
return nil
}
func compactSeriesFilePartition(path string) error {
const tmpExt = ".tmp"
fmt.Fprintf(compactSeriesFileFlags.Stdout, "processing partition for %q\n", path)
// Open partition so index can recover from entries not in the snapshot.
partitionID, err := strconv.Atoi(filepath.Base(path))
if err != nil {
return fmt.Errorf("cannot parse partition id from path: %s", path)
}
p := tsdb.NewSeriesPartition(partitionID, path)
if err := p.Open(); err != nil {
return fmt.Errorf("cannot open partition: path=%s err=%s", path, err)
}
defer p.Close()
// Loop over segments and compact.
indexPath := p.IndexPath()
var segmentPaths []string
for _, segment := range p.Segments() {
fmt.Fprintf(compactSeriesFileFlags.Stdout, "processing segment %q %d\n", segment.Path(), segment.ID())
if err := segment.CompactToPath(segment.Path()+tmpExt, p.Index()); err != nil {
return err
}
segmentPaths = append(segmentPaths, segment.Path())
}
// Close partition.
if err := p.Close(); err != nil {
return err
}
// Remove the old segment files and replace with new ones.
for _, dst := range segmentPaths {
src := dst + tmpExt
fmt.Fprintf(compactSeriesFileFlags.Stdout, "renaming new segment %q to %q\n", src, dst)
if err = os.Rename(src, dst); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("serious failure. Please rebuild index and series file: %v", err)
}
}
// Remove index file so it will be rebuilt when reopened.
fmt.Fprintln(compactSeriesFileFlags.Stdout, "removing index file", indexPath)
if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality
return err
}
return nil
}
// seriesFilePartitionPaths returns the paths to each partition in the series file.
func seriesFilePartitionPaths(path string) ([]string, error) {
sfile := tsdb.NewSeriesFile(path)
if err := sfile.Open(context.Background()); err != nil {
return nil, err
}
var paths []string
for _, partition := range sfile.Partitions() {
paths = append(paths, partition.Path())
}
if err := sfile.Close(); err != nil {
return nil, err
}
return paths, nil
}

View File

@ -15,6 +15,7 @@ func NewCommand() *cobra.Command {
// If a new sub-command is created, it must be added here
subCommands := []*cobra.Command{
NewBuildTSICommand(),
NewCompactSeriesFileCommand(),
NewExportBlocksCommand(),
NewExportIndexCommand(),
NewReportTSMCommand(),

View File

@ -212,6 +212,18 @@ func (f *SeriesFile) DisableCompactions() {
}
}
// FileSize returns the size of all partitions, in bytes.
func (f *SeriesFile) FileSize() (n int64, err error) {
for _, p := range f.partitions {
v, err := p.FileSize()
n += v
if err != nil {
return n, err
}
}
return n, err
}
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. It overwrites
// the collection's Keys and SeriesIDs fields. The collection's SeriesIDs slice will have IDs for
// every name+tags, creating new series IDs as needed. If any SeriesID is zero, then a type

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"golang.org/x/sync/errgroup"
)
func TestParseSeriesKeyInto(t *testing.T) {
@ -271,6 +272,148 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) {
}
}
func TestSeriesFile_Compaction(t *testing.T) {
const n = 1000
sfile := MustOpenSeriesFile()
defer sfile.Close()
// Generate a bunch of keys.
var collection tsdb.SeriesCollection
for i := 0; i < n; i++ {
collection.Names = append(collection.Names, []byte("cpu"))
collection.Tags = append(collection.Tags, models.NewTags(map[string]string{"region": fmt.Sprintf("r%d", i)}))
collection.Types = append(collection.Types, models.Integer)
}
// Add all to the series file.
err := sfile.CreateSeriesListIfNotExists(&collection)
if err != nil {
t.Fatal(err)
}
// Delete a subset of keys.
for i := 0; i < n; i++ {
if i%10 != 0 {
continue
}
if id := sfile.SeriesID(collection.Names[i], collection.Tags[i], nil); id.IsZero() {
t.Fatal("expected series id")
} else if err := sfile.DeleteSeriesID(id); err != nil {
t.Fatal(err)
}
}
// Compute total size of all series data.
origSize, err := sfile.FileSize()
if err != nil {
t.Fatal(err)
}
// Compact all segments.
var paths []string
for _, p := range sfile.Partitions() {
for _, ss := range p.Segments() {
if err := ss.CompactToPath(ss.Path()+".tmp", p.Index()); err != nil {
t.Fatal(err)
}
paths = append(paths, ss.Path())
}
}
// Close index.
if err := sfile.SeriesFile.Close(); err != nil {
t.Fatal(err)
}
// Overwrite files.
for _, path := range paths {
if err := os.Rename(path+".tmp", path); err != nil {
t.Fatal(err)
}
}
// Reopen index.
sfile.SeriesFile = tsdb.NewSeriesFile(sfile.SeriesFile.Path())
if err := sfile.SeriesFile.Open(context.Background()); err != nil {
t.Fatal(err)
}
// Ensure series status is correct.
for i := 0; i < n; i++ {
if id := sfile.SeriesID(collection.Names[i], collection.Tags[i], nil); id.IsZero() {
continue
} else if got, want := sfile.IsDeleted(id), (i%10) == 0; got != want {
t.Fatalf("IsDeleted(%d)=%v, want %v", id, got, want)
}
}
// Verify new size is smaller.
newSize, err := sfile.FileSize()
if err != nil {
t.Fatal(err)
} else if newSize >= origSize {
t.Fatalf("expected new size (%d) to be smaller than original size (%d)", newSize, origSize)
}
t.Logf("original size: %d, new size: %d", origSize, newSize)
}
var cachedCompactionSeriesFile *SeriesFile
func BenchmarkSeriesFile_Compaction(b *testing.B) {
const n = 1000000
if cachedCompactionSeriesFile == nil {
sfile := MustOpenSeriesFile()
// Generate a bunch of keys.
ids := make([]tsdb.SeriesID, n)
for i := 0; i < n; i++ {
collection := &tsdb.SeriesCollection{
Names: [][]byte{[]byte("cpu")},
Tags: []models.Tags{models.NewTags(map[string]string{"region": fmt.Sprintf("r%d", i)})},
Types: []models.FieldType{models.Integer},
}
if err := sfile.CreateSeriesListIfNotExists(collection); err != nil {
b.Fatal(err)
} else if ids[i] = sfile.SeriesID(collection.Names[0], collection.Tags[0], nil); ids[i].IsZero() {
b.Fatalf("expected series id: i=%d", i)
}
}
// Delete a subset of keys.
for i := 0; i < len(ids); i += 10 {
if err := sfile.DeleteSeriesID(ids[i]); err != nil {
b.Fatal(err)
}
}
cachedCompactionSeriesFile = sfile
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Compact all segments in parallel.
var g errgroup.Group
for _, p := range cachedCompactionSeriesFile.Partitions() {
for _, segment := range p.Segments() {
p, segment := p, segment
g.Go(func() error {
return segment.CompactToPath(segment.Path()+".tmp", p.Index())
})
}
}
if err := g.Wait(); err != nil {
b.Fatal(err)
}
}
}
// Series represents name/tagset pairs that are used in testing.
type Series struct {
Name []byte

View File

@ -62,7 +62,7 @@ func NewSeriesPartition(id int, path string) *SeriesPartition {
closing: make(chan struct{}),
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
LargeWriteThreshold: DefaultLargeSeriesWriteThreshold,
tracker: newSeriesPartitionTracker(newSeriesFileMetrics(nil), nil),
tracker: newSeriesPartitionTracker(newSeriesFileMetrics(nil), prometheus.Labels{"series_file_partition": fmt.Sprint(id)}),
Logger: zap.NewNop(),
seq: uint64(id) + 1,
}
@ -184,6 +184,24 @@ func (p *SeriesPartition) Path() string { return p.path }
// IndexPath returns the path to the series index.
func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "index") }
// Index returns the partition's index.
func (p *SeriesPartition) Index() *SeriesIndex { return p.index }
// Segments returns the segments in the partition.
func (p *SeriesPartition) Segments() []*SeriesSegment { return p.segments }
// FileSize returns the size of all partitions, in bytes.
func (p *SeriesPartition) FileSize() (n int64, err error) {
for _, ss := range p.segments {
fi, err := os.Stat(ss.Path())
if err != nil {
return 0, err
}
n += fi.Size()
}
return n, err
}
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The ids parameter is modified to contain series IDs for all keys belonging to this partition.
// If the type does not match the existing type for the key, a zero id is stored.

View File

@ -184,6 +184,8 @@ func (s *SeriesSegment) Size() int64 { return int64(s.size) }
// Slice returns a byte slice starting at pos.
func (s *SeriesSegment) Slice(pos uint32) []byte { return s.data[pos:] }
func (s *SeriesSegment) Path() string { return s.path }
// WriteLogEntry writes entry data into the segment.
// Returns the offset of the beginning of the entry.
func (s *SeriesSegment) WriteLogEntry(data []byte) (offset int64, err error) {
@ -264,6 +266,46 @@ func (s *SeriesSegment) Clone() *SeriesSegment {
}
}
// CompactToPath rewrites the segment to a new file and removes tombstoned entries.
func (s *SeriesSegment) CompactToPath(path string, index *SeriesIndex) error {
dst, err := CreateSeriesSegment(s.id, path)
if err != nil {
return err
}
defer dst.Close()
if err = dst.InitForWrite(); err != nil {
return err
}
// Iterate through the segment and write any entries to a new segment
// that exist in the index.
var buf []byte
if err = s.ForEachEntry(func(flag uint8, id SeriesIDTyped, _ int64, key []byte) error {
if index.IsDeleted(id.SeriesID()) {
return nil // series id has been deleted from index
} else if flag == SeriesEntryTombstoneFlag {
return fmt.Errorf("[series id %d]: tombstone entry but exists in index", id)
}
// copy entry over to new segment
buf = AppendSeriesEntry(buf[:0], flag, id, key)
_, err := dst.WriteLogEntry(buf)
return err
}); err != nil {
return err
}
// Close the segment and truncate it to its maximum size.
size := dst.size
if err := dst.Close(); err != nil {
return err
} else if err := os.Truncate(dst.path, int64(size)); err != nil {
return err
}
return nil
}
// CloneSeriesSegments returns a copy of a slice of segments.
func CloneSeriesSegments(a []*SeriesSegment) []*SeriesSegment {
other := make([]*SeriesSegment, len(a))