Incrementally rebuild tsi bloom filters.
parent
a5a2957567
commit
0ec2736f23
|
@ -24,7 +24,11 @@ type FileSet struct {
|
|||
|
||||
// NewFileSet returns a new instance of FileSet.
|
||||
func NewFileSet(levels []CompactionLevel, files []File) (*FileSet, error) {
|
||||
fs := &FileSet{levels: levels, files: files}
|
||||
fs := &FileSet{
|
||||
levels: levels,
|
||||
files: files,
|
||||
filters: make([]*bloom.Filter, len(levels)),
|
||||
}
|
||||
if err := fs.buildFilters(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -56,9 +60,14 @@ func (fs *FileSet) Release() {
|
|||
}
|
||||
}
|
||||
|
||||
// Prepend returns a new file set with f added at the beginning.
|
||||
func (fs *FileSet) Prepend(f File) (*FileSet, error) {
|
||||
return NewFileSet(fs.levels, append([]File{f}, fs.files...))
|
||||
// PrependLogFile returns a new file set with f added at the beginning.
|
||||
// Filters do not need to be rebuilt because log files have no bloom filter.
|
||||
func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet {
|
||||
return &FileSet{
|
||||
levels: fs.levels,
|
||||
files: append([]File{f}, fs.files...),
|
||||
filters: fs.filters,
|
||||
}
|
||||
}
|
||||
|
||||
// MustReplace swaps a list of files for a single file and returns a new file set.
|
||||
|
@ -89,11 +98,32 @@ func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
|
|||
other[i] = newFile
|
||||
copy(other[i+1:], fs.files[i+len(oldFiles):])
|
||||
|
||||
fs, err := NewFileSet(fs.levels, other)
|
||||
if err != nil {
|
||||
// Copy existing bloom filters.
|
||||
filters := make([]*bloom.Filter, len(fs.filters))
|
||||
copy(filters, fs.filters)
|
||||
|
||||
// Merge new file into existing filter.
|
||||
if filters[newFile.Level()] == nil {
|
||||
filters[newFile.Level()] = newFile.Filter()
|
||||
} else {
|
||||
filters[newFile.Level()].Merge(newFile.Filter())
|
||||
}
|
||||
|
||||
// Clear filters at replaced file levels.
|
||||
for _, f := range oldFiles {
|
||||
filters[f.Level()] = nil
|
||||
}
|
||||
|
||||
// Build new fileset and rebuild changed filters.
|
||||
newFS := &FileSet{
|
||||
levels: fs.levels,
|
||||
files: other,
|
||||
filters: filters,
|
||||
}
|
||||
if err := newFS.buildFilters(); err != nil {
|
||||
panic("cannot build file set: " + err.Error())
|
||||
}
|
||||
return fs
|
||||
return newFS
|
||||
}
|
||||
|
||||
// MaxID returns the highest file identifier.
|
||||
|
@ -913,32 +943,39 @@ func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf
|
|||
// buildFilters builds a series existence filter for each compaction level.
|
||||
func (fs *FileSet) buildFilters() error {
|
||||
if len(fs.levels) == 0 {
|
||||
fs.filters = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Generate filters for each level.
|
||||
fs.filters = make([]*bloom.Filter, len(fs.levels))
|
||||
|
||||
// Merge filters at each level.
|
||||
for _, f := range fs.files {
|
||||
level := f.Level()
|
||||
|
||||
// Skip if file has no bloom filter.
|
||||
if f.Filter() == nil {
|
||||
// Build filters for each level where the filter is non-existent.
|
||||
files := fs.files
|
||||
for level := range fs.levels {
|
||||
// Clear filter if level doesn't exist.
|
||||
if level == 0 || len(files) == 0 || files[0].Level() > level {
|
||||
fs.filters[level] = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Initialize a filter if it doesn't exist.
|
||||
if fs.filters[level] == nil {
|
||||
lvl := fs.levels[level]
|
||||
fs.filters[level] = bloom.NewFilter(lvl.M, lvl.K)
|
||||
// Skip files at this level if filter already exists.
|
||||
if fs.filters[level] != nil {
|
||||
for len(files) > 0 {
|
||||
if files[0].Level() > level {
|
||||
break
|
||||
}
|
||||
files = files[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// Merge filter.
|
||||
if err := fs.filters[level].Merge(f.Filter()); err != nil {
|
||||
// Build new filter from files at this level.
|
||||
fs.filters[level] = bloom.NewFilter(fs.levels[level].M, fs.levels[level].K)
|
||||
for len(files) > 0 {
|
||||
if files[0].Level() != level {
|
||||
break
|
||||
}
|
||||
if err := fs.filters[level].Merge(files[0].Filter()); err != nil {
|
||||
return err
|
||||
}
|
||||
files = files[1:]
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -342,11 +342,7 @@ func (i *Index) prependActiveLogFile() error {
|
|||
i.activeLogFile = f
|
||||
|
||||
// Prepend and generate new fileset.
|
||||
fs, err := i.fileSet.Prepend(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i.fileSet = fs
|
||||
i.fileSet = i.fileSet.PrependLogFile(f)
|
||||
|
||||
// Write new manifest.
|
||||
if err := i.writeManifestFile(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue