fix(tsdb): allow backups during snapshotting, and don't leak tmp files (#20527)
Co-authored-by: davidby-influx <dbyrne@influxdata.com>pull/20539/head
parent
4239d037d6
commit
743aef4a98
|
@ -47,6 +47,8 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards
|
|||
1. [20489](https://github.com/influxdata/influxdb/pull/20489): Improve error message when opening BoltDB with unsupported file system options.
|
||||
1. [20490](https://github.com/influxdata/influxdb/pull/20490): Fix silent failure to register CLI args as required.
|
||||
1. [20522](https://github.com/influxdata/influxdb/pull/20522): Fix loading config when INFLUXD_CONFIG_PATH points to a `.yml` file.
|
||||
1. [20527](https://github.com/influxdata/influxdb/pull/20527): Don't leak .tmp files while backing up shards.
|
||||
1. [20527](https://github.com/influxdata/influxdb/pull/20527): Allow backups to complete while a snapshot is in progress.
|
||||
|
||||
## v2.0.3 [2020-12-14]
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ type Engine interface {
|
|||
|
||||
LoadMetadataIndex(shardID uint64, index Index) error
|
||||
|
||||
CreateSnapshot() (string, error)
|
||||
CreateSnapshot(skipCacheOk bool) (string, error)
|
||||
Backup(w io.Writer, basePath string, since time.Time) error
|
||||
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
|
||||
Restore(r io.Reader, basePath string) error
|
||||
|
|
|
@ -908,25 +908,14 @@ func (e *Engine) Free() error {
|
|||
// of the files in the archive. It will force a snapshot of the WAL first
|
||||
// then perform the backup with a read lock against the file store. This means
|
||||
// that new TSM files will not be able to be created in this shard while the
|
||||
// backup is running. For shards that are still acively getting writes, this
|
||||
// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes.
|
||||
// backup is running. For shards that are still actively getting writes, this
|
||||
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
|
||||
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
|
||||
var err error
|
||||
var path string
|
||||
for i := 0; i < 3; i++ {
|
||||
path, err = e.CreateSnapshot()
|
||||
if err != nil {
|
||||
switch err {
|
||||
case ErrSnapshotInProgress:
|
||||
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
|
||||
time.Sleep(backoff)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err == ErrSnapshotInProgress {
|
||||
e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.")
|
||||
path, err = e.CreateSnapshot(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Remove the temporary snapshot dir
|
||||
defer os.RemoveAll(path)
|
||||
|
@ -990,7 +979,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
|
|||
}
|
||||
|
||||
func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
|
||||
path, err := e.CreateSnapshot()
|
||||
path, err := e.CreateSnapshot(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1873,9 +1862,19 @@ func (e *Engine) WriteSnapshot() (err error) {
|
|||
}
|
||||
|
||||
// CreateSnapshot will create a temp directory that holds
|
||||
// temporary hardlinks to the underylyng shard files.
|
||||
func (e *Engine) CreateSnapshot() (string, error) {
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
// temporary hardlinks to the underlying shard files.
|
||||
// skipCacheOk controls whether it is permissible to fail writing out
|
||||
// in-memory cache data when a previous snapshot is in progress.
|
||||
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
|
||||
err := e.WriteSnapshot()
|
||||
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
|
||||
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
|
||||
time.Sleep(backoff)
|
||||
err = e.WriteSnapshot()
|
||||
}
|
||||
if err == ErrSnapshotInProgress && skipCacheOk {
|
||||
e.logger.Warn("Snapshotter busy: proceeding without cache contents")
|
||||
} else if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/logger"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "shard_test")
|
||||
require.NoError(t, err, "error creating temporary directory")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
tmpShard := filepath.Join(tmpDir, "shard")
|
||||
tmpWal := filepath.Join(tmpDir, "wal")
|
||||
|
||||
sfile := NewSeriesFile(tmpDir)
|
||||
defer sfile.Close()
|
||||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile, opts)
|
||||
require.NoError(t, sh.Open(), "error opening shard")
|
||||
defer sh.Close()
|
||||
|
||||
points := make([]models.Point, 0, 10000)
|
||||
for i := 0; i < cap(points); i++ {
|
||||
points = append(points, models.MustNewPoint(
|
||||
"cpu",
|
||||
models.NewTags(map[string]string{"host": "server"}),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(int64(i), 0),
|
||||
))
|
||||
}
|
||||
err = sh.WritePoints(points)
|
||||
require.NoError(t, err)
|
||||
|
||||
engineInterface, err := sh.Engine()
|
||||
require.NoError(t, err, "error retrieving shard engine")
|
||||
|
||||
// Get the struct underlying the interface. Not a recommended practice.
|
||||
realEngineStruct, ok := (engineInterface).(*Engine)
|
||||
if !ok {
|
||||
t.Log("Engine type does not permit simulating Cache race conditions")
|
||||
return
|
||||
}
|
||||
// fake a race condition in snapshotting the cache.
|
||||
realEngineStruct.Cache.snapshotting = true
|
||||
defer func() {
|
||||
realEngineStruct.Cache.snapshotting = false
|
||||
}()
|
||||
|
||||
snapshotFunc := func(skipCacheOk bool) {
|
||||
if f, err := sh.CreateSnapshot(skipCacheOk); err == nil {
|
||||
require.NoError(t, os.RemoveAll(f), "error cleaning up TestEngine_ConcurrentShardSnapshots")
|
||||
} else if err == ErrSnapshotInProgress {
|
||||
if skipCacheOk {
|
||||
t.Fatalf("failing to ignore this error,: %s", err.Error())
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("error creating shard snapshot: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Permit skipping cache in the snapshot
|
||||
snapshotFunc(true)
|
||||
// do not permit skipping the cache in the snapshot
|
||||
snapshotFunc(false)
|
||||
realEngineStruct.Cache.snapshotting = false
|
||||
}
|
||||
|
||||
// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
|
||||
func NewSeriesFile(tmpDir string) *tsdb.SeriesFile {
|
||||
dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
f := tsdb.NewSeriesFile(dir)
|
||||
f.Logger = logger.New(os.Stdout)
|
||||
if err := f.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
type seriesIDSets []*tsdb.SeriesIDSet
|
||||
|
||||
func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
|
||||
for _, v := range a {
|
||||
f(v)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -1133,12 +1133,12 @@ func (s *Shard) Import(r io.Reader, basePath string) error {
|
|||
|
||||
// CreateSnapshot will return a path to a temp directory
|
||||
// containing hard links to the underlying shard files.
|
||||
func (s *Shard) CreateSnapshot() (string, error) {
|
||||
func (s *Shard) CreateSnapshot(skipCacheOk bool) (string, error) {
|
||||
engine, err := s.Engine()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return engine.CreateSnapshot()
|
||||
return engine.CreateSnapshot(skipCacheOk)
|
||||
}
|
||||
|
||||
// ForEachMeasurementName iterates over each measurement in the shard.
|
||||
|
|
|
@ -415,7 +415,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
|||
}
|
||||
|
||||
_ = sh.WritePoints(points[:500])
|
||||
if f, err := sh.CreateSnapshot(); err == nil {
|
||||
if f, err := sh.CreateSnapshot(false); err == nil {
|
||||
os.RemoveAll(f)
|
||||
}
|
||||
|
||||
|
@ -431,7 +431,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
|||
}
|
||||
|
||||
_ = sh.WritePoints(points[500:])
|
||||
if f, err := sh.CreateSnapshot(); err == nil {
|
||||
if f, err := sh.CreateSnapshot(false); err == nil {
|
||||
os.RemoveAll(f)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -629,13 +629,13 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
|
|||
|
||||
// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
|
||||
// The caller is responsible for cleaning up (removing) the file path returned.
|
||||
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
|
||||
func (s *Store) CreateShardSnapshot(id uint64, skipCacheOk bool) (string, error) {
|
||||
sh := s.Shard(id)
|
||||
if sh == nil {
|
||||
return "", ErrShardNotFound
|
||||
}
|
||||
|
||||
return sh.CreateSnapshot()
|
||||
return sh.CreateSnapshot(skipCacheOk)
|
||||
}
|
||||
|
||||
// SetShardEnabled enables or disables a shard for read and writes.
|
||||
|
|
|
@ -449,7 +449,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
|
|||
t.Fatalf("expected shard")
|
||||
}
|
||||
|
||||
dir, e := s.CreateShardSnapshot(1)
|
||||
dir, e := s.CreateShardSnapshot(1, false)
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue