From 627b6f86bbc2ac7219b15aa3e115407d6d966eff Mon Sep 17 00:00:00 2001
From: Ben Johnson <benbjohnson@yahoo.com>
Date: Mon, 9 Mar 2020 15:16:09 -0600
Subject: [PATCH] feat(storage): Series file compaction

---
 cmd/influxd/inspect/compact_series_file.go | 192 +++++++++++++++++++++
 cmd/influxd/inspect/inspect.go             |   1 +
 tsdb/series_file.go                        |  12 ++
 tsdb/series_file_test.go                   | 143 +++++++++++++++
 tsdb/series_partition.go                   |  20 ++-
 tsdb/series_segment.go                     |  42 +++++
 6 files changed, 409 insertions(+), 1 deletion(-)
 create mode 100644 cmd/influxd/inspect/compact_series_file.go

diff --git a/cmd/influxd/inspect/compact_series_file.go b/cmd/influxd/inspect/compact_series_file.go
new file mode 100644
index 0000000000..11ac617b93
--- /dev/null
+++ b/cmd/influxd/inspect/compact_series_file.go
@@ -0,0 +1,192 @@
+package inspect
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"os"
+	"path/filepath"
+	"runtime"
+	"strconv"
+	"strings"
+
+	"github.com/influxdata/influxdb/internal/fs"
+	"github.com/influxdata/influxdb/storage"
+	"github.com/influxdata/influxdb/tsdb"
+	"github.com/spf13/cobra"
+	"golang.org/x/sync/errgroup"
+)
+
+var compactSeriesFileFlags = struct {
+	// Standard input/output, overridden for testing.
+	Stderr io.Writer
+	Stdout io.Writer
+
+	// Data path options
+	SeriesFilePath string // optional. Defaults to <engine_path>/engine/_series
+	IndexPath      string // optional. Defaults to <engine_path>/engine/index
+
+	Concurrency int // optional. Defaults to GOMAXPROCS(0)
+}{
+	Stderr: os.Stderr,
+	Stdout: os.Stdout,
+}
+
+// NewCompactSeriesFileCommand returns a new instance of Command with default setting applied.
+func NewCompactSeriesFileCommand() *cobra.Command {
+	cmd := &cobra.Command{
+		Use:   "compact-series-file",
+		Short: "Compacts the series file to removed deleted series.",
+		Long:  `This command will compact the series file by removing deleted series.`,
+		RunE:  RunCompactSeriesFile,
+	}
+
+	home, _ := fs.InfluxDir()
+	defaultPath := filepath.Join(home, "engine")
+	defaultSFilePath := filepath.Join(defaultPath, storage.DefaultSeriesFileDirectoryName)
+	defaultIndexPath := filepath.Join(defaultPath, storage.DefaultIndexDirectoryName)
+
+	cmd.Flags().StringVar(&compactSeriesFileFlags.SeriesFilePath, "sfile-path", defaultSFilePath, "Path to the Series File directory. Defaults to "+defaultSFilePath)
+	cmd.Flags().StringVar(&compactSeriesFileFlags.IndexPath, "tsi-path", defaultIndexPath, "Path to the TSI index directory. Defaults to "+defaultIndexPath)
+
+	cmd.Flags().IntVar(&compactSeriesFileFlags.Concurrency, "concurrency", runtime.GOMAXPROCS(0), "Number of workers to dedicate to compaction. Defaults to GOMAXPROCS. Max 8.")
+
+	cmd.SetOutput(compactSeriesFileFlags.Stdout)
+
+	return cmd
+}
+
+// RunCompactSeriesFile executes the run command for CompactSeriesFile.
+func RunCompactSeriesFile(cmd *cobra.Command, args []string) error {
+	// Verify the user actually wants to run as root.
+	if isRoot() {
+		fmt.Fprintln(compactSeriesFileFlags.Stdout, "You are currently running as root. This will compact your")
+		fmt.Fprintln(compactSeriesFileFlags.Stdout, "series file with root ownership and will be inaccessible")
+		fmt.Fprintln(compactSeriesFileFlags.Stdout, "if you run influxd as a non-root user. You should run")
+		fmt.Fprintln(compactSeriesFileFlags.Stdout, "influxd inspect compact-series-file as the same user you are running influxd.")
+		fmt.Fprint(compactSeriesFileFlags.Stdout, "Are you sure you want to continue? (y/N): ")
+		var answer string
+		if fmt.Scanln(&answer); !strings.HasPrefix(strings.TrimSpace(strings.ToLower(answer)), "y") {
+			return fmt.Errorf("operation aborted")
+		}
+	}
+
+	paths, err := seriesFilePartitionPaths(compactSeriesFileFlags.SeriesFilePath)
+	if err != nil {
+		return err
+	}
+
+	// Build input channel.
+	pathCh := make(chan string, len(paths))
+	for _, path := range paths {
+		pathCh <- path
+	}
+	close(pathCh)
+
+	// Limit maximum concurrency to the total number of series file partitions.
+	concurrency := compactSeriesFileFlags.Concurrency
+	if concurrency > tsdb.SeriesFilePartitionN {
+		concurrency = tsdb.SeriesFilePartitionN
+	}
+
+	// Concurrently process each partition in the series file
+	var g errgroup.Group
+	for i := 0; i < concurrency; i++ {
+		g.Go(func() error {
+			for path := range pathCh {
+				if err := compactSeriesFilePartition(path); err != nil {
+					return err
+				}
+			}
+			return nil
+		})
+	}
+	if err := g.Wait(); err != nil {
+		return err
+	}
+
+	// Build new series file indexes
+	sfile := tsdb.NewSeriesFile(compactSeriesFileFlags.SeriesFilePath)
+	if err = sfile.Open(context.Background()); err != nil {
+		return err
+	}
+
+	compactor := tsdb.NewSeriesPartitionCompactor()
+	for _, partition := range sfile.Partitions() {
+		duration, err := compactor.Compact(partition)
+		if err != nil {
+			return err
+		}
+		fmt.Fprintf(compactSeriesFileFlags.Stdout, "compacted %s in %s\n", partition.Path(), duration)
+	}
+	return nil
+}
+
+func compactSeriesFilePartition(path string) error {
+	const tmpExt = ".tmp"
+
+	fmt.Fprintf(compactSeriesFileFlags.Stdout, "processing partition for %q\n", path)
+
+	// Open partition so index can recover from entries not in the snapshot.
+	partitionID, err := strconv.Atoi(filepath.Base(path))
+	if err != nil {
+		return fmt.Errorf("cannot parse partition id from path: %s", path)
+	}
+	p := tsdb.NewSeriesPartition(partitionID, path)
+	if err := p.Open(); err != nil {
+		return fmt.Errorf("cannot open partition: path=%s err=%s", path, err)
+	}
+	defer p.Close()
+
+	// Loop over segments and compact.
+	indexPath := p.IndexPath()
+	var segmentPaths []string
+	for _, segment := range p.Segments() {
+		fmt.Fprintf(compactSeriesFileFlags.Stdout, "processing segment %q %d\n", segment.Path(), segment.ID())
+
+		if err := segment.CompactToPath(segment.Path()+tmpExt, p.Index()); err != nil {
+			return err
+		}
+		segmentPaths = append(segmentPaths, segment.Path())
+	}
+
+	// Close partition.
+	if err := p.Close(); err != nil {
+		return err
+	}
+
+	// Remove the old segment files and replace with new ones.
+	for _, dst := range segmentPaths {
+		src := dst + tmpExt
+
+		fmt.Fprintf(compactSeriesFileFlags.Stdout, "renaming new segment %q to %q\n", src, dst)
+		if err = os.Rename(src, dst); err != nil && !os.IsNotExist(err) {
+			return fmt.Errorf("serious failure. Please rebuild index and series file: %v", err)
+		}
+	}
+
+	// Remove index file so it will be rebuilt when reopened.
+	fmt.Fprintln(compactSeriesFileFlags.Stdout, "removing index file", indexPath)
+	if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality
+		return err
+	}
+
+	return nil
+}
+
+// seriesFilePartitionPaths returns the paths to each partition in the series file.
+func seriesFilePartitionPaths(path string) ([]string, error) {
+	sfile := tsdb.NewSeriesFile(path)
+	if err := sfile.Open(context.Background()); err != nil {
+		return nil, err
+	}
+
+	var paths []string
+	for _, partition := range sfile.Partitions() {
+		paths = append(paths, partition.Path())
+	}
+	if err := sfile.Close(); err != nil {
+		return nil, err
+	}
+	return paths, nil
+}
diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go
index c701a095ec..368b65696a 100644
--- a/cmd/influxd/inspect/inspect.go
+++ b/cmd/influxd/inspect/inspect.go
@@ -15,6 +15,7 @@ func NewCommand() *cobra.Command {
 	// If a new sub-command is created, it must be added here
 	subCommands := []*cobra.Command{
 		NewBuildTSICommand(),
+		NewCompactSeriesFileCommand(),
 		NewExportBlocksCommand(),
 		NewExportIndexCommand(),
 		NewReportTSMCommand(),
diff --git a/tsdb/series_file.go b/tsdb/series_file.go
index d7cd31f4f1..64657ae218 100644
--- a/tsdb/series_file.go
+++ b/tsdb/series_file.go
@@ -212,6 +212,18 @@ func (f *SeriesFile) DisableCompactions() {
 	}
 }
 
+// FileSize returns the size of all partitions, in bytes.
+func (f *SeriesFile) FileSize() (n int64, err error) {
+	for _, p := range f.partitions {
+		v, err := p.FileSize()
+		n += v
+		if err != nil {
+			return n, err
+		}
+	}
+	return n, err
+}
+
 // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. It overwrites
 // the collection's Keys and SeriesIDs fields. The collection's SeriesIDs slice will have IDs for
 // every name+tags, creating new series IDs as needed. If any SeriesID is zero, then a type
diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go
index dff8ce18de..e975c6af11 100644
--- a/tsdb/series_file_test.go
+++ b/tsdb/series_file_test.go
@@ -12,6 +12,7 @@ import (
 	"github.com/influxdata/influxdb/logger"
 	"github.com/influxdata/influxdb/models"
 	"github.com/influxdata/influxdb/tsdb"
+	"golang.org/x/sync/errgroup"
 )
 
 func TestParseSeriesKeyInto(t *testing.T) {
@@ -271,6 +272,148 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) {
 	}
 }
 
+func TestSeriesFile_Compaction(t *testing.T) {
+	const n = 1000
+
+	sfile := MustOpenSeriesFile()
+	defer sfile.Close()
+
+	// Generate a bunch of keys.
+	var collection tsdb.SeriesCollection
+	for i := 0; i < n; i++ {
+		collection.Names = append(collection.Names, []byte("cpu"))
+		collection.Tags = append(collection.Tags, models.NewTags(map[string]string{"region": fmt.Sprintf("r%d", i)}))
+		collection.Types = append(collection.Types, models.Integer)
+	}
+
+	// Add all to the series file.
+	err := sfile.CreateSeriesListIfNotExists(&collection)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Delete a subset of keys.
+	for i := 0; i < n; i++ {
+		if i%10 != 0 {
+			continue
+		}
+
+		if id := sfile.SeriesID(collection.Names[i], collection.Tags[i], nil); id.IsZero() {
+			t.Fatal("expected series id")
+		} else if err := sfile.DeleteSeriesID(id); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Compute total size of all series data.
+	origSize, err := sfile.FileSize()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Compact all segments.
+	var paths []string
+	for _, p := range sfile.Partitions() {
+		for _, ss := range p.Segments() {
+			if err := ss.CompactToPath(ss.Path()+".tmp", p.Index()); err != nil {
+				t.Fatal(err)
+			}
+			paths = append(paths, ss.Path())
+		}
+	}
+
+	// Close index.
+	if err := sfile.SeriesFile.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	// Overwrite files.
+	for _, path := range paths {
+		if err := os.Rename(path+".tmp", path); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Reopen index.
+	sfile.SeriesFile = tsdb.NewSeriesFile(sfile.SeriesFile.Path())
+	if err := sfile.SeriesFile.Open(context.Background()); err != nil {
+		t.Fatal(err)
+	}
+
+	// Ensure series status is correct.
+	for i := 0; i < n; i++ {
+		if id := sfile.SeriesID(collection.Names[i], collection.Tags[i], nil); id.IsZero() {
+			continue
+		} else if got, want := sfile.IsDeleted(id), (i%10) == 0; got != want {
+			t.Fatalf("IsDeleted(%d)=%v, want %v", id, got, want)
+		}
+	}
+
+	// Verify new size is smaller.
+	newSize, err := sfile.FileSize()
+	if err != nil {
+		t.Fatal(err)
+	} else if newSize >= origSize {
+		t.Fatalf("expected new size (%d) to be smaller than original size (%d)", newSize, origSize)
+	}
+
+	t.Logf("original size: %d, new size: %d", origSize, newSize)
+}
+
+var cachedCompactionSeriesFile *SeriesFile
+
+func BenchmarkSeriesFile_Compaction(b *testing.B) {
+	const n = 1000000
+
+	if cachedCompactionSeriesFile == nil {
+		sfile := MustOpenSeriesFile()
+
+		// Generate a bunch of keys.
+		ids := make([]tsdb.SeriesID, n)
+		for i := 0; i < n; i++ {
+			collection := &tsdb.SeriesCollection{
+				Names: [][]byte{[]byte("cpu")},
+				Tags:  []models.Tags{models.NewTags(map[string]string{"region": fmt.Sprintf("r%d", i)})},
+				Types: []models.FieldType{models.Integer},
+			}
+
+			if err := sfile.CreateSeriesListIfNotExists(collection); err != nil {
+				b.Fatal(err)
+			} else if ids[i] = sfile.SeriesID(collection.Names[0], collection.Tags[0], nil); ids[i].IsZero() {
+				b.Fatalf("expected series id: i=%d", i)
+			}
+		}
+
+		// Delete a subset of keys.
+		for i := 0; i < len(ids); i += 10 {
+			if err := sfile.DeleteSeriesID(ids[i]); err != nil {
+				b.Fatal(err)
+			}
+		}
+
+		cachedCompactionSeriesFile = sfile
+	}
+
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		// Compact all segments in parallel.
+		var g errgroup.Group
+		for _, p := range cachedCompactionSeriesFile.Partitions() {
+			for _, segment := range p.Segments() {
+				p, segment := p, segment
+				g.Go(func() error {
+					return segment.CompactToPath(segment.Path()+".tmp", p.Index())
+				})
+			}
+		}
+
+		if err := g.Wait(); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
 // Series represents name/tagset pairs that are used in testing.
 type Series struct {
 	Name    []byte
diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go
index 2c1e3e3c77..52ce4d0329 100644
--- a/tsdb/series_partition.go
+++ b/tsdb/series_partition.go
@@ -62,7 +62,7 @@ func NewSeriesPartition(id int, path string) *SeriesPartition {
 		closing:             make(chan struct{}),
 		CompactThreshold:    DefaultSeriesPartitionCompactThreshold,
 		LargeWriteThreshold: DefaultLargeSeriesWriteThreshold,
-		tracker:             newSeriesPartitionTracker(newSeriesFileMetrics(nil), nil),
+		tracker:             newSeriesPartitionTracker(newSeriesFileMetrics(nil), prometheus.Labels{"series_file_partition": fmt.Sprint(id)}),
 		Logger:              zap.NewNop(),
 		seq:                 uint64(id) + 1,
 	}
@@ -184,6 +184,24 @@ func (p *SeriesPartition) Path() string { return p.path }
 // IndexPath returns the path to the series index.
 func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "index") }
 
+// Index returns the partition's index.
+func (p *SeriesPartition) Index() *SeriesIndex { return p.index }
+
+// Segments returns the segments in the partition.
+func (p *SeriesPartition) Segments() []*SeriesSegment { return p.segments }
+
+// FileSize returns the size of all partitions, in bytes.
+func (p *SeriesPartition) FileSize() (n int64, err error) {
+	for _, ss := range p.segments {
+		fi, err := os.Stat(ss.Path())
+		if err != nil {
+			return 0, err
+		}
+		n += fi.Size()
+	}
+	return n, err
+}
+
 // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
 // The ids parameter is modified to contain series IDs for all keys belonging to this partition.
 // If the type does not match the existing type for the key, a zero id is stored.
diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go
index b11838a726..93e4fa2afe 100644
--- a/tsdb/series_segment.go
+++ b/tsdb/series_segment.go
@@ -184,6 +184,8 @@ func (s *SeriesSegment) Size() int64 { return int64(s.size) }
 // Slice returns a byte slice starting at pos.
 func (s *SeriesSegment) Slice(pos uint32) []byte { return s.data[pos:] }
 
+func (s *SeriesSegment) Path() string { return s.path }
+
 // WriteLogEntry writes entry data into the segment.
 // Returns the offset of the beginning of the entry.
 func (s *SeriesSegment) WriteLogEntry(data []byte) (offset int64, err error) {
@@ -264,6 +266,46 @@ func (s *SeriesSegment) Clone() *SeriesSegment {
 	}
 }
 
+// CompactToPath rewrites the segment to a new file and removes tombstoned entries.
+func (s *SeriesSegment) CompactToPath(path string, index *SeriesIndex) error {
+	dst, err := CreateSeriesSegment(s.id, path)
+	if err != nil {
+		return err
+	}
+	defer dst.Close()
+
+	if err = dst.InitForWrite(); err != nil {
+		return err
+	}
+
+	// Iterate through the segment and write any entries to a new segment
+	// that exist in the index.
+	var buf []byte
+	if err = s.ForEachEntry(func(flag uint8, id SeriesIDTyped, _ int64, key []byte) error {
+		if index.IsDeleted(id.SeriesID()) {
+			return nil // series id has been deleted from index
+		} else if flag == SeriesEntryTombstoneFlag {
+			return fmt.Errorf("[series id %d]: tombstone entry but exists in index", id)
+		}
+
+		// copy entry over to new segment
+		buf = AppendSeriesEntry(buf[:0], flag, id, key)
+		_, err := dst.WriteLogEntry(buf)
+		return err
+	}); err != nil {
+		return err
+	}
+
+	// Close the segment and truncate it to its maximum size.
+	size := dst.size
+	if err := dst.Close(); err != nil {
+		return err
+	} else if err := os.Truncate(dst.path, int64(size)); err != nil {
+		return err
+	}
+	return nil
+}
+
 // CloneSeriesSegments returns a copy of a slice of segments.
 func CloneSeriesSegments(a []*SeriesSegment) []*SeriesSegment {
 	other := make([]*SeriesSegment, len(a))