diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 5e687b673..551669c09 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -227,7 +227,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du * return r.errorOut(ctx, du, err, "error to initialize data path", log) } log.WithField("path", path.ByPath).Info("fs init") - if err := fsBackup.StartBackup(path, "", false, nil); err != nil { + if err := fsBackup.StartBackup(path, fmt.Sprintf("%s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, nil); err != nil { return r.errorOut(ctx, du, err, "error starting data path backup", log) } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 7c98c788d..0515a05a4 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -232,7 +232,7 @@ func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNam return nil } -func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { +func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error { du := f.du original := f.du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 71fb4850f..3507a49ea 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -178,7 +178,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } } - if err := fsBackup.StartBackup(path, parentSnapshotID, false, pvb.Spec.Tags); err != nil { + if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags); err != nil { return r.errorOut(ctx, &pvb, err, "error starting data path backup", log) } diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index 1f1e9e543..a7ec9f59c 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -103,7 +103,7 @@ func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace str return nil } -func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { +func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error { pvb := b.pvb original := b.pvb.DeepCopy() diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index ce0e8dc0a..4e4b16aee 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -129,13 +129,13 @@ func (fs *fileSystemBR) Close(ctx context.Context) { fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") } -func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { +func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error { if !fs.initialized { return errors.New("file system data path is not initialized") } go func() { - snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs) + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index 19c36f6de..072f1996c 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -95,12 +95,12 @@ func TestAsyncBackup(t *testing.T) { t.Run(test.name, func(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) - mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks - err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", false, nil) + err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil) require.Equal(t, nil, err) <-finish diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index 0b1e47c12..4f85fceec 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -61,7 +61,7 @@ type AsyncBR interface { Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error // StartBackup starts an asynchronous data path instance for backup - StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error + StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error // StartRestore starts an asynchronous data path instance for restore StartRestore(snapshotID string, target AccessPoint) error diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 96d2ddc1d..aa8970a65 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -84,7 +84,7 @@ func setupDefaultPolicy() *policy.Tree { } // Backup backup specific sourcePath and update progress -func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, +func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { if fsUploader == nil { return nil, false, errors.New("get empty kopia uploader") @@ -102,12 +102,18 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep return nil, true, nil } + dir = filepath.Clean(dir) + sourceInfo := snapshot.SourceInfo{ UserName: udmrepo.GetRepoUser(), Host: udmrepo.GetRepoDomain(), - Path: filepath.Clean(dir), + Path: filepath.Clean(realSource), } - rootDir, err := getLocalFSEntry(sourceInfo.Path) + if sourceInfo.Path == "" { + sourceInfo.Path = dir + } + + rootDir, err := getLocalFSEntry(dir) if err != nil { return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") } @@ -173,6 +179,8 @@ func SnapshotSource( var previous []*snapshot.Manifest if !forceFull { if parentSnapshot != "" { + log.Infof("Using provided parent snapshot %s", parentSnapshot) + mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot)) if err != nil { return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot) @@ -180,13 +188,21 @@ func SnapshotSource( previous = append(previous, mani) } else { - pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil) + log.Infof("Searching for parent snapshot") + + pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil, log) if err != nil { return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo) } previous = pre } + } else { + log.Info("Forcing full snapshot") + } + + for i := range previous { + log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description) } policyTree := setupDefaultPolicy() @@ -237,7 +253,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree) // findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including // last complete snapshot following it. -func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *fs.UTCTimestamp) ([]*snapshot.Manifest, error) { +func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *fs.UTCTimestamp, log logrus.FieldLogger) ([]*snapshot.Manifest, error) { man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo) if err != nil { return nil, err @@ -247,6 +263,8 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour var result []*snapshot.Manifest for _, p := range man { + log.Debugf("Found one snapshot %s, start time %v, incomplete %s, tags %v", p.ID, p.StartTime.ToTime(), p.IncompleteReason, p.Tags) + requestor, found := p.Tags[uploader.SnapshotRequestorTag] if !found { continue diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index f578419a6..44346b9da 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -113,6 +113,7 @@ func (kp *kopiaProvider) Close(ctx context.Context) error { func (kp *kopiaProvider) RunBackup( ctx context.Context, path string, + realSource string, tags map[string]string, forceFull bool, parentSnapshot string, @@ -121,8 +122,13 @@ func (kp *kopiaProvider) RunBackup( return "", false, errors.New("Need to initial backup progress updater first") } + if path == "" { + return "", false, errors.New("path is empty") + } + log := kp.log.WithFields(logrus.Fields{ "path": path, + "realSource": realSource, "parentSnapshot": parentSnapshot, }) repoWriter := kopia.NewShimRepo(kp.bkRepo) @@ -146,7 +152,7 @@ func (kp *kopiaProvider) RunBackup( tags[uploader.SnapshotRequestorTag] = kp.requestorType tags[uploader.SnapshotUploaderTag] = uploader.KopiaType - snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, forceFull, parentSnapshot, tags, log) + snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log) if err != nil { if kpUploader.IsCanceled() { log.Error("Kopia backup is canceled") diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 207a3ed03..6746b7c70 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -40,26 +40,26 @@ func TestRunBackup(t *testing.T) { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()} testCases := []struct { name string - hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) + hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) notError bool }{ { name: "success to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return &uploader.SnapshotInfo{}, false, nil }, notError: true, }, { name: "get error to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return &uploader.SnapshotInfo{}, false, errors.New("failed to backup") }, notError: false, }, { name: "got empty snapshot", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return nil, true, errors.New("snapshot is empty") }, notError: false, @@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { BackupFunc = tc.hookBackupFunc - _, _, err := kp.RunBackup(context.Background(), "var", nil, false, "", &updater) + _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", &updater) if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/mocks/Provider.go b/pkg/uploader/provider/mocks/Provider.go index dbb52b7fb..54a461ea1 100644 --- a/pkg/uploader/provider/mocks/Provider.go +++ b/pkg/uploader/provider/mocks/Provider.go @@ -29,30 +29,30 @@ func (_m *Provider) Close(ctx context.Context) error { return r0 } -// RunBackup provides a mock function with given fields: ctx, path, tags, forceFull, parentSnapshot, updater -func (_m *Provider) RunBackup(ctx context.Context, path string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { - ret := _m.Called(ctx, path, tags, forceFull, parentSnapshot, updater) +// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, updater +func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { + ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, updater) var r0 string var r1 bool var r2 error - if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok { - return rf(ctx, path, tags, forceFull, parentSnapshot, updater) + if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok { + return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater) } - if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok { - r0 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) + if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok { + r0 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok { - r1 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) + if rf, ok := ret.Get(1).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok { + r1 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater) } else { r1 = ret.Get(1).(bool) } - if rf, ok := ret.Get(2).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok { - r2 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) + if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok { + r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater) } else { r2 = ret.Error(2) } diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index f38a69cba..5926609bf 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -44,6 +44,7 @@ type Provider interface { RunBackup( ctx context.Context, path string, + realSource string, tags map[string]string, forceFull bool, parentSnapshot string, diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go index 7b68d2e5b..b9698c5a5 100644 --- a/pkg/uploader/provider/restic.go +++ b/pkg/uploader/provider/restic.go @@ -112,6 +112,7 @@ func (rp *resticProvider) Close(ctx context.Context) error { func (rp *resticProvider) RunBackup( ctx context.Context, path string, + realSource string, tags map[string]string, forceFull bool, parentSnapshot string, @@ -120,6 +121,14 @@ func (rp *resticProvider) RunBackup( return "", false, errors.New("Need to initial backup progress updater first") } + if path == "" { + return "", false, errors.New("path is empty") + } + + if realSource != "" { + return "", false, errors.New("real source is not empty, this is not supported by restic uploader") + } + log := rp.log.WithFields(logrus.Fields{ "path": path, "parentSnapshot": parentSnapshot, diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go index f2ba7a970..e82570a84 100644 --- a/pkg/uploader/provider/restic_test.go +++ b/pkg/uploader/provider/restic_test.go @@ -64,7 +64,7 @@ func TestResticRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ResticBackupCMDFunc = tc.hookBackupFunc - _, _, err := rp.RunBackup(context.Background(), "var", nil, false, "", &updater) + _, _, err := rp.RunBackup(context.Background(), "var", "", nil, false, "", &updater) rp.log.Infof("test name %v error %v", tc.name, err) require.Equal(t, true, tc.errorHandleFunc(err)) })