feat: check for uncommitted WRR segments during startup (#25560)
Check for uncommitted WRR segments during startup and abort startup
if found.
Closes: #25559
(cherry picked from commit 037c6af6e8
)
pull/25577/head
parent
941a41bbef
commit
78e1d77a80
|
@ -12,6 +12,7 @@ import (
|
|||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -32,6 +33,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// ErrIncompatibleWAL is returned if incompatible WAL files are detected.
|
||||
ErrIncompatibleWAL = errors.New("incompatible WAL format")
|
||||
// ErrShardNotFound is returned when trying to get a non existing shard.
|
||||
ErrShardNotFound = fmt.Errorf("shard not found")
|
||||
// ErrStoreClosed is returned when trying to use a closed Store.
|
||||
|
@ -308,6 +311,76 @@ func (s *Store) Open(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
wrrFileExtension = "wrr"
|
||||
wrrPrefixVersioned = "_v"
|
||||
wrrSnapshotExtension = "snapshot"
|
||||
)
|
||||
|
||||
// generateWRRSegmentFileGlob generates a glob to find all .wrr and related files in a
|
||||
// WAL directory.
|
||||
func generateWRRSegmentFileGlob() string {
|
||||
return fmt.Sprintf("%s*.%s*", wrrPrefixVersioned, wrrFileExtension)
|
||||
}
|
||||
|
||||
// checkUncommittedWRR determines if there are any uncommitted WRR files found in shardWALPath.
|
||||
// shardWALPath is the path to a single shard's WAL, not the overall WAL path.
|
||||
// If no uncommitted WRR files are found, then nil is returned. Otherwise, an error indicating
|
||||
// the names of uncommitted WRR files is returned. The error returned contains the full context
|
||||
// and does not require additional information.
|
||||
func checkUncommittedWRR(shardWALPath string) error {
|
||||
// It is OK if there are .wrr files as long as they are committed. Committed .wrr files will
|
||||
// have a .wrr.snapshot newer than the .wrr file. If there is no .wrr.snapshot file newer
|
||||
// than a given .wrr file, then that .wrr file is uncommitted and we should return an error
|
||||
// indicating possible data loss due to an in-place conversion of an incompatible WAL format.
|
||||
// Note that newness for .wrr and .wrr.snapshot files is determined lexically by the name,
|
||||
// and not the ctime or mtime of the files.
|
||||
|
||||
unfilteredNames, err := filepath.Glob(filepath.Join(shardWALPath, generateWRRSegmentFileGlob()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error finding WRR files in %q: %w", shardWALPath, err)
|
||||
}
|
||||
snapshotExt := fmt.Sprintf(".%s.%s", wrrFileExtension, wrrSnapshotExtension)
|
||||
|
||||
// Strip out files that are not .wal or .wal.snapshot, given the glob pattern
|
||||
// could include false positives, such as foo.wally or foo.wal.snapshotted
|
||||
names := make([]string, 0, len(unfilteredNames))
|
||||
for _, name := range unfilteredNames {
|
||||
if strings.HasSuffix(name, wrrFileExtension) || strings.HasSuffix(name, snapshotExt) {
|
||||
names = append(names, name)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(names)
|
||||
|
||||
// Find the last snapshot and collect the files after it
|
||||
for i := len(names) - 1; i >= 0; i-- {
|
||||
if strings.HasSuffix(names[i], snapshotExt) {
|
||||
names = names[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// names now contains a list of uncommitted WRR files.
|
||||
if len(names) > 0 {
|
||||
return fmt.Errorf("%w: uncommitted WRR files found: %v", ErrIncompatibleWAL, names)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkWALCompatibility ensures that an uncommitted WAL segments in an incompatible
|
||||
// format are not present. shardWALPath is the path to a single shard's WAL, not the
|
||||
// overall WAL path. A ErrIncompatibleWAL error with further details is returned if
|
||||
// an incompatible WAL with unflushed segments is found, The error returned contains
|
||||
// the full context and does not require additional information.
|
||||
func checkWALCompatibility(shardWALPath string) error {
|
||||
// There is one known incompatible WAL format, the .wrr format. Finding these is a problem
|
||||
// if they are uncommitted. OSS can not read .wrr WAL files, so any uncommitted data in them
|
||||
// will be lost.
|
||||
return checkUncommittedWRR(shardWALPath)
|
||||
}
|
||||
|
||||
// generateTrailingPath returns the last part of a shard path or WAL path
|
||||
// based on the shardID, db, and rp.
|
||||
func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string {
|
||||
|
@ -507,6 +580,21 @@ func (s *Store) loadShards(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Verify no incompatible WAL files. Do this before starting to load shards to fail early if found.
|
||||
// All shards are scanned instead of stopping at just the first one so that the admin will see
|
||||
// all the problematic shards.
|
||||
if s.EngineOptions.WALEnabled {
|
||||
var errs []error
|
||||
for _, sh := range shards {
|
||||
if err := checkWALCompatibility(s.generateWALPath(sh.id, sh.db, sh.rp)); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
}
|
||||
|
||||
// Do the actual work of loading shards.
|
||||
shardResC := make(chan *shardResponse, len(shards))
|
||||
pendingShardCount := 0
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/deep"
|
||||
"github.com/influxdata/influxdb/v2/pkg/slices"
|
||||
"github.com/influxdata/influxdb/v2/pkg/snowflake"
|
||||
"github.com/influxdata/influxdb/v2/predicate"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
|
@ -886,6 +887,122 @@ func TestStore_FlushWALOnClose(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStore_WRRSegments(t *testing.T) {
|
||||
// Check if uncommitted WRR segments are identified and cause opening the store to abort.
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run("TestStore_WRRSegments_"+index, func(t *testing.T) {
|
||||
idGen := snowflake.New(0)
|
||||
generateWRRSegmentName := func() string {
|
||||
return fmt.Sprintf("_v01_%020d.wrr", idGen.Next())
|
||||
}
|
||||
createFile := func(t *testing.T, fn string) {
|
||||
t.Helper()
|
||||
require.NoError(t, os.WriteFile(fn, nil, 0666))
|
||||
}
|
||||
createWRR := func(t *testing.T, path string) string {
|
||||
t.Helper()
|
||||
fn := filepath.Join(path, generateWRRSegmentName())
|
||||
createFile(t, fn)
|
||||
return fn
|
||||
}
|
||||
generateWRRSnapshotName := func() string {
|
||||
return generateWRRSegmentName() + ".snapshot"
|
||||
}
|
||||
createWRRSnapshot := func(t *testing.T, path string) string {
|
||||
t.Helper()
|
||||
fn := filepath.Join(path, generateWRRSnapshotName())
|
||||
createFile(t, fn)
|
||||
return fn
|
||||
}
|
||||
checkWRRError := func(t *testing.T, err error, wrrs ...[]string) {
|
||||
t.Helper()
|
||||
require.ErrorIs(t, err, tsdb.ErrIncompatibleWAL)
|
||||
require.ErrorContains(t, err, "incompatible WAL format: uncommitted WRR files found")
|
||||
// We don't know the exact order of the errors if there are multiple shards with
|
||||
// uncommitted WRRs, but this will insure that all of them are included in the error
|
||||
// message.
|
||||
for _, w := range wrrs {
|
||||
if len(w) > 0 {
|
||||
require.ErrorContains(t, err, fmt.Sprintf("%v", w))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s := MustOpenStore(t, index, WithWALFlushOnShutdown(true))
|
||||
defer s.Close()
|
||||
|
||||
// Create shard #0 with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 0,
|
||||
`cpu,host=serverA value=1 0`,
|
||||
`cpu,host=serverA value=2 10`,
|
||||
`cpu,host=serverB value=3 20`,
|
||||
)
|
||||
|
||||
// Create shard #1 with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 1,
|
||||
`cpu,host=serverA value=1 30`,
|
||||
`cpu,host=serverC value=3 60`,
|
||||
)
|
||||
|
||||
sh0WALPath := filepath.Join(s.walPath, "db0", "rp0", "0")
|
||||
require.DirExists(t, sh0WALPath)
|
||||
sh1WALPath := filepath.Join(s.walPath, "db0", "rp0", "1")
|
||||
require.DirExists(t, sh1WALPath)
|
||||
|
||||
// No WRR segments, no error
|
||||
require.NoError(t, s.Reopen(t))
|
||||
|
||||
// 1 uncommitted WRR segment in shard 0
|
||||
var sh0Uncommitted, sh1Uncommitted []string
|
||||
checkReopen := func(t *testing.T) {
|
||||
t.Helper()
|
||||
allUncommitted := [][]string{sh0Uncommitted, sh1Uncommitted}
|
||||
var hasUncommitted bool
|
||||
for _, u := range allUncommitted {
|
||||
if len(u) > 0 {
|
||||
hasUncommitted = true
|
||||
}
|
||||
}
|
||||
|
||||
if hasUncommitted {
|
||||
checkWRRError(t, s.Reopen(t), allUncommitted...)
|
||||
} else {
|
||||
require.NoError(t, s.Reopen(t))
|
||||
}
|
||||
}
|
||||
sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath))
|
||||
checkReopen(t)
|
||||
|
||||
// 2 uncommitted WRR segments in shard 0
|
||||
sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath))
|
||||
checkReopen(t)
|
||||
|
||||
// 2 uncommitted WR segments in shard 0, 1 in shard 1
|
||||
sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath))
|
||||
checkReopen(t)
|
||||
|
||||
// No uncommitted WRR in shard 0, 1 in shard 1
|
||||
createWRRSnapshot(t, sh0WALPath)
|
||||
sh0Uncommitted = nil
|
||||
checkReopen(t)
|
||||
|
||||
// No uncommitted WRR segments
|
||||
createWRRSnapshot(t, sh1WALPath)
|
||||
sh1Uncommitted = nil
|
||||
checkReopen(t)
|
||||
|
||||
// Add 1 uncommitted to shard 1
|
||||
sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath))
|
||||
checkReopen(t)
|
||||
|
||||
// No uncommitted WRR segments
|
||||
createWRRSnapshot(t, sh1WALPath)
|
||||
sh1Uncommitted = nil
|
||||
checkReopen(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test new reader blocking.
|
||||
func TestStore_NewReadersBlocked(t *testing.T) {
|
||||
//t.Parallel()
|
||||
|
|
Loading…
Reference in New Issue