From 0ec2736f234287dda96560374f190c3bd2ea3bee Mon Sep 17 00:00:00 2001
From: Ben Johnson <benbjohnson@yahoo.com>
Date: Tue, 29 Aug 2017 10:12:43 -0600
Subject: [PATCH] Incrementally rebuild tsi bloom filters.

---
 tsdb/index/tsi1/file_set.go | 85 ++++++++++++++++++++++++++-----------
 tsdb/index/tsi1/index.go    |  6 +--
 2 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go
index 1f9ec16ee2..717261c34b 100644
--- a/tsdb/index/tsi1/file_set.go
+++ b/tsdb/index/tsi1/file_set.go
@@ -24,7 +24,11 @@ type FileSet struct {
 
 // NewFileSet returns a new instance of FileSet.
 func NewFileSet(levels []CompactionLevel, files []File) (*FileSet, error) {
-	fs := &FileSet{levels: levels, files: files}
+	fs := &FileSet{
+		levels:  levels,
+		files:   files,
+		filters: make([]*bloom.Filter, len(levels)),
+	}
 	if err := fs.buildFilters(); err != nil {
 		return nil, err
 	}
@@ -56,9 +60,14 @@ func (fs *FileSet) Release() {
 	}
 }
 
-// Prepend returns a new file set with f added at the beginning.
-func (fs *FileSet) Prepend(f File) (*FileSet, error) {
-	return NewFileSet(fs.levels, append([]File{f}, fs.files...))
+// PrependLogFile returns a new file set with f added at the beginning.
+// Filters do not need to be rebuilt because log files have no bloom filter.
+func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet {
+	return &FileSet{
+		levels:  fs.levels,
+		files:   append([]File{f}, fs.files...),
+		filters: fs.filters,
+	}
 }
 
 // MustReplace swaps a list of files for a single file and returns a new file set.
@@ -89,11 +98,32 @@ func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
 	other[i] = newFile
 	copy(other[i+1:], fs.files[i+len(oldFiles):])
 
-	fs, err := NewFileSet(fs.levels, other)
-	if err != nil {
+	// Copy existing bloom filters.
+	filters := make([]*bloom.Filter, len(fs.filters))
+	copy(filters, fs.filters)
+
+	// Merge new file into existing filter.
+	if filters[newFile.Level()] == nil {
+		filters[newFile.Level()] = newFile.Filter()
+	} else {
+		filters[newFile.Level()].Merge(newFile.Filter())
+	}
+
+	// Clear filters at replaced file levels.
+	for _, f := range oldFiles {
+		filters[f.Level()] = nil
+	}
+
+	// Build new fileset and rebuild changed filters.
+	newFS := &FileSet{
+		levels:  fs.levels,
+		files:   other,
+		filters: filters,
+	}
+	if err := newFS.buildFilters(); err != nil {
 		panic("cannot build file set: " + err.Error())
 	}
-	return fs
+	return newFS
 }
 
 // MaxID returns the highest file identifier.
@@ -913,31 +943,38 @@ func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf
 // buildFilters builds a series existence filter for each compaction level.
 func (fs *FileSet) buildFilters() error {
 	if len(fs.levels) == 0 {
-		fs.filters = nil
 		return nil
 	}
 
-	// Generate filters for each level.
-	fs.filters = make([]*bloom.Filter, len(fs.levels))
-
-	// Merge filters at each level.
-	for _, f := range fs.files {
-		level := f.Level()
-
-		// Skip if file has no bloom filter.
-		if f.Filter() == nil {
+	// Build filters for each level where the filter is non-existent.
+	files := fs.files
+	for level := range fs.levels {
+		// Clear filter if level doesn't exist.
+		if level == 0 || len(files) == 0 || files[0].Level() > level {
+			fs.filters[level] = nil
 			continue
 		}
 
-		// Initialize a filter if it doesn't exist.
-		if fs.filters[level] == nil {
-			lvl := fs.levels[level]
-			fs.filters[level] = bloom.NewFilter(lvl.M, lvl.K)
+		// Skip files at this level if filter already exists.
+		if fs.filters[level] != nil {
+			for len(files) > 0 {
+				if files[0].Level() > level {
+					break
+				}
+				files = files[1:]
+			}
 		}
 
-		// Merge filter.
-		if err := fs.filters[level].Merge(f.Filter()); err != nil {
-			return err
+		// Build new filter from files at this level.
+		fs.filters[level] = bloom.NewFilter(fs.levels[level].M, fs.levels[level].K)
+		for len(files) > 0 {
+			if files[0].Level() != level {
+				break
+			}
+			if err := fs.filters[level].Merge(files[0].Filter()); err != nil {
+				return err
+			}
+			files = files[1:]
 		}
 	}
 
diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go
index 239cbb4048..789121c8a5 100644
--- a/tsdb/index/tsi1/index.go
+++ b/tsdb/index/tsi1/index.go
@@ -342,11 +342,7 @@ func (i *Index) prependActiveLogFile() error {
 	i.activeLogFile = f
 
 	// Prepend and generate new fileset.
-	fs, err := i.fileSet.Prepend(f)
-	if err != nil {
-		return err
-	}
-	i.fileSet = fs
+	i.fileSet = i.fileSet.PrependLogFile(f)
 
 	// Write new manifest.
 	if err := i.writeManifestFile(); err != nil {