fix(tsi): clean up FileSet fields (#18961)
parent
3a11610d47
commit
108e2600b3
|
@ -214,10 +214,7 @@ func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.File
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fs, err := tsi1.NewFileSet(nil, sfile, files)
|
fs := tsi1.NewFileSet(files)
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
fs.Retain()
|
fs.Retain()
|
||||||
|
|
||||||
return nil, fs, nil
|
return nil, fs, nil
|
||||||
|
|
|
@ -15,27 +15,19 @@ import (
|
||||||
|
|
||||||
// FileSet represents a collection of files.
|
// FileSet represents a collection of files.
|
||||||
type FileSet struct {
|
type FileSet struct {
|
||||||
levels []CompactionLevel
|
files []File
|
||||||
sfile *tsdb.SeriesFile
|
|
||||||
files []File
|
|
||||||
manifestSize int64 // Size of the manifest file in bytes.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileSet returns a new instance of FileSet.
|
// NewFileSet returns a new instance of FileSet.
|
||||||
func NewFileSet(levels []CompactionLevel, sfile *tsdb.SeriesFile, files []File) (*FileSet, error) {
|
func NewFileSet(files []File) *FileSet {
|
||||||
return &FileSet{
|
return &FileSet{
|
||||||
levels: levels,
|
files: files,
|
||||||
sfile: sfile,
|
}
|
||||||
files: files,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// bytes estimates the memory footprint of this FileSet, in bytes.
|
// bytes estimates the memory footprint of this FileSet, in bytes.
|
||||||
func (fs *FileSet) bytes() int {
|
func (fs *FileSet) bytes() int {
|
||||||
var b int
|
var b int
|
||||||
for _, level := range fs.levels {
|
|
||||||
b += int(unsafe.Sizeof(level))
|
|
||||||
}
|
|
||||||
// Do not count SeriesFile because it belongs to the code that constructed this FileSet.
|
// Do not count SeriesFile because it belongs to the code that constructed this FileSet.
|
||||||
for _, file := range fs.files {
|
for _, file := range fs.files {
|
||||||
b += file.bytes()
|
b += file.bytes()
|
||||||
|
@ -69,16 +61,11 @@ func (fs *FileSet) Release() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesFile returns the attached series file.
|
|
||||||
func (fs *FileSet) SeriesFile() *tsdb.SeriesFile { return fs.sfile }
|
|
||||||
|
|
||||||
// PrependLogFile returns a new file set with f added at the beginning.
|
// PrependLogFile returns a new file set with f added at the beginning.
|
||||||
// Filters do not need to be rebuilt because log files have no bloom filter.
|
// Filters do not need to be rebuilt because log files have no bloom filter.
|
||||||
func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet {
|
func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet {
|
||||||
return &FileSet{
|
return &FileSet{
|
||||||
levels: fs.levels,
|
files: append([]File{f}, fs.files...),
|
||||||
sfile: fs.sfile,
|
|
||||||
files: append([]File{f}, fs.files...),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +75,7 @@ func (fs *FileSet) Size() int64 {
|
||||||
for _, f := range fs.files {
|
for _, f := range fs.files {
|
||||||
total += f.Size()
|
total += f.Size()
|
||||||
}
|
}
|
||||||
return total + int64(fs.manifestSize)
|
return total
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustReplace swaps a list of files for a single file and returns a new file set.
|
// MustReplace swaps a list of files for a single file and returns a new file set.
|
||||||
|
@ -121,8 +108,7 @@ func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
|
||||||
|
|
||||||
// Build new fileset and rebuild changed filters.
|
// Build new fileset and rebuild changed filters.
|
||||||
return &FileSet{
|
return &FileSet{
|
||||||
levels: fs.levels,
|
files: other,
|
||||||
files: other,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,28 +128,6 @@ func (fs *FileSet) Files() []File {
|
||||||
return fs.files
|
return fs.files
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogFiles returns all log files from the file set.
|
|
||||||
func (fs *FileSet) LogFiles() []*LogFile {
|
|
||||||
var a []*LogFile
|
|
||||||
for _, f := range fs.files {
|
|
||||||
if f, ok := f.(*LogFile); ok {
|
|
||||||
a = append(a, f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
// IndexFiles returns all index files from the file set.
|
|
||||||
func (fs *FileSet) IndexFiles() []*IndexFile {
|
|
||||||
var a []*IndexFile
|
|
||||||
for _, f := range fs.files {
|
|
||||||
if f, ok := f.(*IndexFile); ok {
|
|
||||||
a = append(a, f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
// LastContiguousIndexFilesByLevel returns the last contiguous files by level.
|
// LastContiguousIndexFilesByLevel returns the last contiguous files by level.
|
||||||
// These can be used by the compaction scheduler.
|
// These can be used by the compaction scheduler.
|
||||||
func (fs *FileSet) LastContiguousIndexFilesByLevel(level int) []*IndexFile {
|
func (fs *FileSet) LastContiguousIndexFilesByLevel(level int) []*IndexFile {
|
||||||
|
|
|
@ -32,11 +32,11 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer fs.Release()
|
defer fs.Release()
|
||||||
|
|
||||||
itr := fs.SeriesFile().SeriesIDIterator()
|
itr := idx.SeriesFile.SeriesIDIterator()
|
||||||
if itr == nil {
|
if itr == nil {
|
||||||
t.Fatal("expected iterator")
|
t.Fatal("expected iterator")
|
||||||
}
|
}
|
||||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
if result := MustReadAllSeriesIDIteratorString(idx.SeriesFile.SeriesFile, itr); !reflect.DeepEqual(result, []string{
|
||||||
"cpu,[{region east}]",
|
"cpu,[{region east}]",
|
||||||
"cpu,[{region west}]",
|
"cpu,[{region west}]",
|
||||||
"mem,[{region east}]",
|
"mem,[{region east}]",
|
||||||
|
@ -62,12 +62,12 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer fs.Release()
|
defer fs.Release()
|
||||||
|
|
||||||
itr := fs.SeriesFile().SeriesIDIterator()
|
itr := idx.SeriesFile.SeriesIDIterator()
|
||||||
if itr == nil {
|
if itr == nil {
|
||||||
t.Fatal("expected iterator")
|
t.Fatal("expected iterator")
|
||||||
}
|
}
|
||||||
|
|
||||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
if result := MustReadAllSeriesIDIteratorString(idx.SeriesFile.SeriesFile, itr); !reflect.DeepEqual(result, []string{
|
||||||
"cpu,[{region east}]",
|
"cpu,[{region east}]",
|
||||||
"cpu,[{region north}]",
|
"cpu,[{region north}]",
|
||||||
"cpu,[{region west}]",
|
"cpu,[{region west}]",
|
||||||
|
@ -106,7 +106,7 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
|
||||||
t.Fatal("expected iterator")
|
t.Fatal("expected iterator")
|
||||||
}
|
}
|
||||||
|
|
||||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
if result := MustReadAllSeriesIDIteratorString(idx.SeriesFile.SeriesFile, itr); !reflect.DeepEqual(result, []string{
|
||||||
"cpu,[{region east}]",
|
"cpu,[{region east}]",
|
||||||
"cpu,[{region west}]",
|
"cpu,[{region west}]",
|
||||||
}) {
|
}) {
|
||||||
|
@ -135,7 +135,7 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
|
||||||
t.Fatalf("expected iterator")
|
t.Fatalf("expected iterator")
|
||||||
}
|
}
|
||||||
|
|
||||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
if result := MustReadAllSeriesIDIteratorString(idx.SeriesFile.SeriesFile, itr); !reflect.DeepEqual(result, []string{
|
||||||
"cpu,[{region east}]",
|
"cpu,[{region east}]",
|
||||||
"cpu,[{region north}]",
|
"cpu,[{region north}]",
|
||||||
"cpu,[{region west}]",
|
"cpu,[{region west}]",
|
||||||
|
|
|
@ -1156,7 +1156,7 @@ func (i *Index) RetainFileSet() (*FileSet, error) {
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
defer i.mu.RUnlock()
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
fs, _ := NewFileSet(nil, i.sfile, nil)
|
fs := NewFileSet(nil)
|
||||||
for _, p := range i.partitions {
|
for _, p := range i.partitions {
|
||||||
pfs, err := p.RetainFileSet()
|
pfs, err := p.RetainFileSet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -211,11 +211,7 @@ func (p *Partition) Open() error {
|
||||||
files = append(files, f)
|
files = append(files, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fs, err := NewFileSet(p.levels, p.sfile, files)
|
p.fileSet = NewFileSet(files)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
p.fileSet = fs
|
|
||||||
|
|
||||||
// Set initial sequence number.
|
// Set initial sequence number.
|
||||||
p.seq = p.fileSet.MaxID()
|
p.seq = p.fileSet.MaxID()
|
||||||
|
|
Loading…
Reference in New Issue