test: check post-compaction series file sizes before reopening (#22534)
parent
59f0f13fe4
commit
233f277c07
|
@ -227,6 +227,10 @@ func Test_BuildTSI_Invalid_Compact_Series_Specific_Shard(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_BuildTSI_Valid_Compact_Series(t *testing.T) {
|
func Test_BuildTSI_Valid_Compact_Series(t *testing.T) {
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
t.Skip("mmap implementation on Windows prevents series-file from shrinking during compaction")
|
||||||
|
}
|
||||||
|
|
||||||
tempDir := newTempDirectory(t, "", "build-tsi")
|
tempDir := newTempDirectory(t, "", "build-tsi")
|
||||||
defer os.RemoveAll(tempDir)
|
defer os.RemoveAll(tempDir)
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/models"
|
"github.com/influxdata/influxdb/v2/models"
|
||||||
"github.com/influxdata/influxdb/v2/tsdb"
|
"github.com/influxdata/influxdb/v2/tsdb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
@ -179,6 +180,22 @@ func TestSeriesFile_Compaction(t *testing.T) {
|
||||||
sfile := MustOpenSeriesFile(t)
|
sfile := MustOpenSeriesFile(t)
|
||||||
defer sfile.Close()
|
defer sfile.Close()
|
||||||
|
|
||||||
|
var segmentPaths []string
|
||||||
|
for _, p := range sfile.Partitions() {
|
||||||
|
for _, ss := range p.Segments() {
|
||||||
|
segmentPaths = append(segmentPaths, ss.Path())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sfileSize := func() (res int64) {
|
||||||
|
for _, p := range segmentPaths {
|
||||||
|
fi, err := os.Stat(p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
res += fi.Size()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Generate a bunch of keys.
|
// Generate a bunch of keys.
|
||||||
var mms [][]byte
|
var mms [][]byte
|
||||||
var tagSets []models.Tags
|
var tagSets []models.Tags
|
||||||
|
@ -189,70 +206,57 @@ func TestSeriesFile_Compaction(t *testing.T) {
|
||||||
|
|
||||||
// Add all to the series file.
|
// Add all to the series file.
|
||||||
ids, err := sfile.CreateSeriesListIfNotExists(mms, tagSets)
|
ids, err := sfile.CreateSeriesListIfNotExists(mms, tagSets)
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete a subset of keys.
|
// Delete a subset of keys.
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
if i%10 == 0 {
|
if i%10 == 0 {
|
||||||
if err := sfile.DeleteSeriesID(id); err != nil {
|
require.NoError(t, sfile.DeleteSeriesID(id))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute total size of all series data.
|
// Check total series count.
|
||||||
origSize, err := sfile.FileSize()
|
require.Equal(t, 1000, int(sfile.SeriesCount()))
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compact all segments.
|
// Compact all segments.
|
||||||
var paths []string
|
var paths []string
|
||||||
for _, p := range sfile.Partitions() {
|
for _, p := range sfile.Partitions() {
|
||||||
for _, ss := range p.Segments() {
|
for _, ss := range p.Segments() {
|
||||||
if err := ss.CompactToPath(ss.Path()+".tmp", p.Index()); err != nil {
|
require.NoError(t, ss.CompactToPath(ss.Path()+".tmp", p.Index()))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
paths = append(paths, ss.Path())
|
paths = append(paths, ss.Path())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close index.
|
// Close index.
|
||||||
if err := sfile.SeriesFile.Close(); err != nil {
|
require.NoError(t, sfile.SeriesFile.Close())
|
||||||
t.Fatal(err)
|
|
||||||
}
|
// Compute total size of all series data.
|
||||||
|
origSize := sfileSize()
|
||||||
|
|
||||||
// Overwrite files.
|
// Overwrite files.
|
||||||
for _, path := range paths {
|
for _, path := range paths {
|
||||||
if err := os.Rename(path+".tmp", path); err != nil {
|
require.NoError(t, os.Rename(path+".tmp", path))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check size of compacted series data.
|
||||||
|
// We do this before reopening the index because on Windows, opening+mmap'ing the series
|
||||||
|
// file will cause the file to grow back to its original size.
|
||||||
|
newSize := sfileSize()
|
||||||
|
|
||||||
|
// Verify new size is smaller.
|
||||||
|
require.Greater(t, origSize, newSize)
|
||||||
|
|
||||||
// Reopen index.
|
// Reopen index.
|
||||||
sfile.SeriesFile = tsdb.NewSeriesFile(sfile.SeriesFile.Path())
|
sfile.SeriesFile = tsdb.NewSeriesFile(sfile.SeriesFile.Path())
|
||||||
if err := sfile.SeriesFile.Open(); err != nil {
|
require.NoError(t, sfile.SeriesFile.Open())
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure series status is correct.
|
// Ensure series status is correct.
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
if got, want := sfile.IsDeleted(id), (i%10) == 0; got != want {
|
require.Equal(t, (i%10) == 0, sfile.IsDeleted(id))
|
||||||
t.Fatalf("IsDeleted(%d)=%v, want %v", id, got, want)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify new size is smaller.
|
// Check total series count.
|
||||||
newSize, err := sfile.FileSize()
|
require.Equal(t, 900, int(sfile.SeriesCount()))
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if newSize >= origSize {
|
|
||||||
t.Fatalf("expected new size (%d) to be smaller than original size (%d)", newSize, origSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("original size: %d, new size: %d", origSize, newSize)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var cachedCompactionSeriesFile *SeriesFile
|
var cachedCompactionSeriesFile *SeriesFile
|
||||||
|
|
Loading…
Reference in New Issue