Merge pull request #6383 from Lyndon-Li/data-mover-backup-smoke-testing

Data mover backup smoke testing
pull/6374/head^2
lyndon 2023-06-13 11:13:14 +08:00 committed by GitHub
commit 8cd55d1826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 66 additions and 32 deletions

View File

@ -227,7 +227,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *
return r.errorOut(ctx, du, err, "error to initialize data path", log) return r.errorOut(ctx, du, err, "error to initialize data path", log)
} }
log.WithField("path", path.ByPath).Info("fs init") log.WithField("path", path.ByPath).Info("fs init")
if err := fsBackup.StartBackup(path, "", false, nil); err != nil { if err := fsBackup.StartBackup(path, fmt.Sprintf("%s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, nil); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log) return r.errorOut(ctx, du, err, "error starting data path backup", log)
} }

View File

@ -232,7 +232,7 @@ func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNam
return nil return nil
} }
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
du := f.du du := f.du
original := f.du.DeepCopy() original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted

View File

@ -178,7 +178,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
} }
} }
if err := fsBackup.StartBackup(path, parentSnapshotID, false, pvb.Spec.Tags); err != nil { if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags); err != nil {
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log) return r.errorOut(ctx, &pvb, err, "error starting data path backup", log)
} }

View File

@ -103,7 +103,7 @@ func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace str
return nil return nil
} }
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
pvb := b.pvb pvb := b.pvb
original := b.pvb.DeepCopy() original := b.pvb.DeepCopy()

View File

@ -129,13 +129,13 @@ func (fs *fileSystemBR) Close(ctx context.Context) {
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
} }
func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
if !fs.initialized { if !fs.initialized {
return errors.New("file system data path is not initialized") return errors.New("file system data path is not initialized")
} }
go func() { go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs) snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs)
if err == provider.ErrorCanceled { if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)

View File

@ -95,12 +95,12 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t) mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
fs.uploaderProv = mockProvider fs.uploaderProv = mockProvider
fs.initialized = true fs.initialized = true
fs.callbacks = test.callbacks fs.callbacks = test.callbacks
err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", false, nil) err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil)
require.Equal(t, nil, err) require.Equal(t, nil, err)
<-finish <-finish

View File

@ -61,7 +61,7 @@ type AsyncBR interface {
Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error
// StartBackup starts an asynchronous data path instance for backup // StartBackup starts an asynchronous data path instance for backup
StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error
// StartRestore starts an asynchronous data path instance for restore // StartRestore starts an asynchronous data path instance for restore
StartRestore(snapshotID string, target AccessPoint) error StartRestore(snapshotID string, target AccessPoint) error

View File

@ -84,7 +84,7 @@ func setupDefaultPolicy() *policy.Tree {
} }
// Backup backup specific sourcePath and update progress // Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string,
forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil { if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader") return nil, false, errors.New("get empty kopia uploader")
@ -102,12 +102,18 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
return nil, true, nil return nil, true, nil
} }
dir = filepath.Clean(dir)
sourceInfo := snapshot.SourceInfo{ sourceInfo := snapshot.SourceInfo{
UserName: udmrepo.GetRepoUser(), UserName: udmrepo.GetRepoUser(),
Host: udmrepo.GetRepoDomain(), Host: udmrepo.GetRepoDomain(),
Path: filepath.Clean(dir), Path: filepath.Clean(realSource),
} }
rootDir, err := getLocalFSEntry(sourceInfo.Path) if sourceInfo.Path == "" {
sourceInfo.Path = dir
}
rootDir, err := getLocalFSEntry(dir)
if err != nil { if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
} }
@ -173,6 +179,8 @@ func SnapshotSource(
var previous []*snapshot.Manifest var previous []*snapshot.Manifest
if !forceFull { if !forceFull {
if parentSnapshot != "" { if parentSnapshot != "" {
log.Infof("Using provided parent snapshot %s", parentSnapshot)
mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot)) mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot))
if err != nil { if err != nil {
return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot) return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot)
@ -180,13 +188,21 @@ func SnapshotSource(
previous = append(previous, mani) previous = append(previous, mani)
} else { } else {
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil) log.Infof("Searching for parent snapshot")
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil, log)
if err != nil { if err != nil {
return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo) return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo)
} }
previous = pre previous = pre
} }
} else {
log.Info("Forcing full snapshot")
}
for i := range previous {
log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description)
} }
policyTree := setupDefaultPolicy() policyTree := setupDefaultPolicy()
@ -237,7 +253,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
// findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including // findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including
// last complete snapshot following it. // last complete snapshot following it.
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *fs.UTCTimestamp) ([]*snapshot.Manifest, error) { func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *fs.UTCTimestamp, log logrus.FieldLogger) ([]*snapshot.Manifest, error) {
man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo) man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil { if err != nil {
return nil, err return nil, err
@ -247,6 +263,8 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
var result []*snapshot.Manifest var result []*snapshot.Manifest
for _, p := range man { for _, p := range man {
log.Debugf("Found one snapshot %s, start time %v, incomplete %s, tags %v", p.ID, p.StartTime.ToTime(), p.IncompleteReason, p.Tags)
requestor, found := p.Tags[uploader.SnapshotRequestorTag] requestor, found := p.Tags[uploader.SnapshotRequestorTag]
if !found { if !found {
continue continue

View File

@ -113,6 +113,7 @@ func (kp *kopiaProvider) Close(ctx context.Context) error {
func (kp *kopiaProvider) RunBackup( func (kp *kopiaProvider) RunBackup(
ctx context.Context, ctx context.Context,
path string, path string,
realSource string,
tags map[string]string, tags map[string]string,
forceFull bool, forceFull bool,
parentSnapshot string, parentSnapshot string,
@ -121,8 +122,13 @@ func (kp *kopiaProvider) RunBackup(
return "", false, errors.New("Need to initial backup progress updater first") return "", false, errors.New("Need to initial backup progress updater first")
} }
if path == "" {
return "", false, errors.New("path is empty")
}
log := kp.log.WithFields(logrus.Fields{ log := kp.log.WithFields(logrus.Fields{
"path": path, "path": path,
"realSource": realSource,
"parentSnapshot": parentSnapshot, "parentSnapshot": parentSnapshot,
}) })
repoWriter := kopia.NewShimRepo(kp.bkRepo) repoWriter := kopia.NewShimRepo(kp.bkRepo)
@ -146,7 +152,7 @@ func (kp *kopiaProvider) RunBackup(
tags[uploader.SnapshotRequestorTag] = kp.requestorType tags[uploader.SnapshotRequestorTag] = kp.requestorType
tags[uploader.SnapshotUploaderTag] = uploader.KopiaType tags[uploader.SnapshotUploaderTag] = uploader.KopiaType
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, forceFull, parentSnapshot, tags, log) snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log)
if err != nil { if err != nil {
if kpUploader.IsCanceled() { if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled") log.Error("Kopia backup is canceled")

View File

@ -40,26 +40,26 @@ func TestRunBackup(t *testing.T) {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()} updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
testCases := []struct { testCases := []struct {
name string name string
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
notError bool notError bool
}{ }{
{ {
name: "success to backup", name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil return &uploader.SnapshotInfo{}, false, nil
}, },
notError: true, notError: true,
}, },
{ {
name: "get error to backup", name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup") return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
}, },
notError: false, notError: false,
}, },
{ {
name: "got empty snapshot", name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty") return nil, true, errors.New("snapshot is empty")
}, },
notError: false, notError: false,
@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
BackupFunc = tc.hookBackupFunc BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", nil, false, "", &updater) _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", &updater)
if tc.notError { if tc.notError {
assert.NoError(t, err) assert.NoError(t, err)
} else { } else {

View File

@ -29,30 +29,30 @@ func (_m *Provider) Close(ctx context.Context) error {
return r0 return r0
} }
// RunBackup provides a mock function with given fields: ctx, path, tags, forceFull, parentSnapshot, updater // RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, tags, forceFull, parentSnapshot, updater) ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
var r0 string var r0 string
var r1 bool var r1 bool
var r2 error var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok {
return rf(ctx, path, tags, forceFull, parentSnapshot, updater) return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} }
if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok { if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok {
r0 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) r0 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} else { } else {
r0 = ret.Get(0).(string) r0 = ret.Get(0).(string)
} }
if rf, ok := ret.Get(1).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok { if rf, ok := ret.Get(1).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok {
r1 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) r1 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} else { } else {
r1 = ret.Get(1).(bool) r1 = ret.Get(1).(bool)
} }
if rf, ok := ret.Get(2).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok { if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok {
r2 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} else { } else {
r2 = ret.Error(2) r2 = ret.Error(2)
} }

View File

@ -44,6 +44,7 @@ type Provider interface {
RunBackup( RunBackup(
ctx context.Context, ctx context.Context,
path string, path string,
realSource string,
tags map[string]string, tags map[string]string,
forceFull bool, forceFull bool,
parentSnapshot string, parentSnapshot string,

View File

@ -112,6 +112,7 @@ func (rp *resticProvider) Close(ctx context.Context) error {
func (rp *resticProvider) RunBackup( func (rp *resticProvider) RunBackup(
ctx context.Context, ctx context.Context,
path string, path string,
realSource string,
tags map[string]string, tags map[string]string,
forceFull bool, forceFull bool,
parentSnapshot string, parentSnapshot string,
@ -120,6 +121,14 @@ func (rp *resticProvider) RunBackup(
return "", false, errors.New("Need to initial backup progress updater first") return "", false, errors.New("Need to initial backup progress updater first")
} }
if path == "" {
return "", false, errors.New("path is empty")
}
if realSource != "" {
return "", false, errors.New("real source is not empty, this is not supported by restic uploader")
}
log := rp.log.WithFields(logrus.Fields{ log := rp.log.WithFields(logrus.Fields{
"path": path, "path": path,
"parentSnapshot": parentSnapshot, "parentSnapshot": parentSnapshot,

View File

@ -64,7 +64,7 @@ func TestResticRunBackup(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ResticBackupCMDFunc = tc.hookBackupFunc ResticBackupCMDFunc = tc.hookBackupFunc
_, _, err := rp.RunBackup(context.Background(), "var", nil, false, "", &updater) _, _, err := rp.RunBackup(context.Background(), "var", "", nil, false, "", &updater)
rp.log.Infof("test name %v error %v", tc.name, err) rp.log.Infof("test name %v error %v", tc.name, err)
require.Equal(t, true, tc.errorHandleFunc(err)) require.Equal(t, true, tc.errorHandleFunc(err))
}) })