update du/dd progress on completion

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/8608/head
Lyndon-Li 2025-01-07 16:01:24 +08:00
parent dce97770cd
commit 411469b90c
10 changed files with 104 additions and 58 deletions

View File

@ -0,0 +1 @@
Fix issue #8497, update du/dd progress on completion

View File

@ -181,7 +181,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
fs.wgDataPath.Done()
}()
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)
if err == provider.ErrorCanceled {
@ -193,7 +193,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}})
}
}()
@ -215,7 +215,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
fs.wgDataPath.Done()
}()
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@ -226,7 +226,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
}
}()

View File

@ -320,7 +320,9 @@ func (ms *microServiceBRWatcher) startWatch() {
logger.Info("Calling callback on data path pod termination")
if lastPod.Status.Phase == v1.PodSucceeded {
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log))
result := funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, getCompletionProgressFromResult(ms.taskType, result))
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, result)
} else {
if strings.HasSuffix(terminateMessage, ErrCancelled) {
ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName)
@ -390,6 +392,19 @@ func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader
return progress
}
func getCompletionProgressFromResult(taskType string, result Result) *uploader.Progress {
progress := &uploader.Progress{}
if taskType == TaskTypeBackup {
progress.BytesDone = result.Backup.TotalBytes
progress.TotalBytes = result.Backup.TotalBytes
} else {
progress.BytesDone = result.Restore.TotalBytes
progress.TotalBytes = result.Restore.TotalBytes
}
return progress
}
func (ms *microServiceBRWatcher) Cancel() {
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled")
}

View File

@ -33,11 +33,13 @@ type BackupResult struct {
SnapshotID string `json:"snapshotID"`
EmptySnapshot bool `json:"emptySnapshot"`
Source AccessPoint `json:"source,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}
// RestoreResult represents the result of a restore
type RestoreResult struct {
Target AccessPoint `json:"target,omitempty"`
Target AccessPoint `json:"target,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}
// Callbacks defines the collection of callbacks during backup/restore

View File

@ -120,13 +120,13 @@ func (kp *kopiaProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error) {
updater uploader.ProgressUpdater) (string, bool, int64, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
return "", false, 0, errors.New("Need to initial backup progress updater first")
}
if path == "" {
return "", false, errors.New("path is empty")
return "", false, 0, errors.New("path is empty")
}
log := kp.log.WithFields(logrus.Fields{
@ -175,9 +175,9 @@ func (kp *kopiaProvider) RunBackup(
if kpUploader.IsCanceled() {
log.Warn("Kopia backup is canceled")
return snapshotID, false, ErrorCanceled
return snapshotID, false, 0, ErrorCanceled
}
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
return snapshotID, false, 0, errors.Wrapf(err, "Failed to run kopia backup")
}
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
@ -189,7 +189,7 @@ func (kp *kopiaProvider) RunBackup(
)
log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, false, nil
return snapshotInfo.ID, false, snapshotInfo.Size, nil
}
func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
@ -211,7 +211,7 @@ func (kp *kopiaProvider) RunRestore(
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) error {
updater uploader.ProgressUpdater) (int64, error) {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
@ -235,12 +235,12 @@ func (kp *kopiaProvider) RunRestore(
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel)
if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")
return 0, errors.Wrapf(err, "Failed to run kopia restore")
}
if atomic.LoadInt32(&kp.canceling) == 1 {
log.Error("Kopia restore is canceled")
return ErrorCanceled
return 0, ErrorCanceled
}
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
@ -253,5 +253,5 @@ func (kp *kopiaProvider) RunRestore(
log.Info(output)
return nil
return size, nil
}

View File

@ -106,7 +106,7 @@ func TestRunBackup(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
_, _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
@ -157,7 +157,7 @@ func TestRunRestore(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
_, err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.20.0. DO NOT EDIT.
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
@ -19,6 +19,10 @@ type Provider struct {
func (_m *Provider) Close(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
@ -30,13 +34,18 @@ func (_m *Provider) Close(ctx context.Context) error {
}
// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, updater uploader.ProgressUpdater) (string, bool, error) {
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, updater uploader.ProgressUpdater) (string, bool, int64, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
if len(ret) == 0 {
panic("no return value specified for RunBackup")
}
var r0 string
var r1 bool
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (string, bool, error)); ok {
var r2 int64
var r3 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (string, bool, int64, error)); ok {
return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) string); ok {
@ -51,36 +60,55 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin
r1 = ret.Get(1).(bool)
}
if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok {
if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) int64); ok {
r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r2 = ret.Error(2)
r2 = ret.Get(2).(int64)
}
return r0, r1, r2
if rf, ok := ret.Get(3).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok {
r3 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r3 = ret.Error(3)
}
return r0, r1, r2, r3
}
// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, volMode, uploaderConfig, updater
func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, uploaderConfig map[string]string, updater uploader.ProgressUpdater) error {
func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, uploaderConfig map[string]string, updater uploader.ProgressUpdater) (int64, error) {
ret := _m.Called(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok {
r0 = rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater)
} else {
r0 = ret.Error(0)
if len(ret) == 0 {
panic("no return value specified for RunRestore")
}
return r0
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (int64, error)); ok {
return rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) int64); ok {
r0 = rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater)
} else {
r0 = ret.Get(0).(int64)
}
type mockConstructorTestingTNewProvider interface {
mock.TestingT
Cleanup(func())
if rf, ok := ret.Get(1).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok {
r1 = rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewProvider(t mockConstructorTestingTNewProvider) *Provider {
// The first argument is typically a *testing.T value.
func NewProvider(t interface {
mock.TestingT
Cleanup(func())
}) *Provider {
mock := &Provider{}
mock.Mock.Test(t)

View File

@ -50,7 +50,7 @@ type Provider interface {
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error)
updater uploader.ProgressUpdater) (string, bool, int64, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party
RunRestore(
@ -59,7 +59,7 @@ type Provider interface {
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderConfig map[string]string,
updater uploader.ProgressUpdater) error
updater uploader.ProgressUpdater) (int64, error)
// Close which will close related repository
Close(ctx context.Context) error
}

View File

@ -124,21 +124,21 @@ func (rp *resticProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error) {
updater uploader.ProgressUpdater) (string, bool, int64, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
return "", false, 0, errors.New("Need to initial backup progress updater first")
}
if path == "" {
return "", false, errors.New("path is empty")
return "", false, 0, errors.New("path is empty")
}
if realSource != "" {
return "", false, errors.New("real source is not empty, this is not supported by restic uploader")
return "", false, 0, errors.New("real source is not empty, this is not supported by restic uploader")
}
if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to support block mode")
return "", false, 0, errors.New("unable to support block mode")
}
log := rp.log.WithFields(logrus.Fields{
@ -149,7 +149,7 @@ func (rp *resticProvider) RunBackup(
if len(uploaderCfg) > 0 {
parallelFilesUpload, err := uploaderutil.GetParallelFilesUpload(uploaderCfg)
if err != nil {
return "", false, errors.Wrap(err, "failed to get uploader config")
return "", false, 0, errors.Wrap(err, "failed to get uploader config")
}
if parallelFilesUpload > 0 {
log.Warnf("ParallelFilesUpload is set to %d, but restic does not support parallel file uploads. Ignoring.", parallelFilesUpload)
@ -171,9 +171,9 @@ func (rp *resticProvider) RunBackup(
if err != nil {
if strings.Contains(stderrBuf, "snapshot is empty") {
log.Debugf("Restic backup got empty dir with %s path", path)
return "", true, nil
return "", true, 0, nil
}
return "", false, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
return "", false, 0, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
}
// GetSnapshotID
snapshotIDCmd := resticGetSnapshotFunc(rp.repoIdentifier, rp.credentialsFile, tags)
@ -184,10 +184,10 @@ func (rp *resticProvider) RunBackup(
}
snapshotID, err := resticGetSnapshotIDFunc(snapshotIDCmd)
if err != nil {
return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
return "", false, 0, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
}
log.Infof("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf)
return snapshotID, false, nil
return snapshotID, false, 0, nil
}
// RunRestore runs a `restore` command and monitors the volume size to
@ -198,9 +198,9 @@ func (rp *resticProvider) RunRestore(
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) error {
updater uploader.ProgressUpdater) (int64, error) {
if updater == nil {
return errors.New("Need to initial backup progress updater first")
return 0, errors.New("Need to initial backup progress updater first")
}
log := rp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
@ -208,7 +208,7 @@ func (rp *resticProvider) RunRestore(
})
if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to support block mode")
return 0, errors.New("unable to support block mode")
}
restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
@ -220,7 +220,7 @@ func (rp *resticProvider) RunRestore(
extraFlags, err := rp.parseRestoreExtraFlags(uploaderCfg)
if err != nil {
return errors.Wrap(err, "failed to parse uploader config")
return 0, errors.Wrap(err, "failed to parse uploader config")
} else if len(extraFlags) != 0 {
restoreCmd.ExtraFlags = append(restoreCmd.ExtraFlags, extraFlags...)
}
@ -228,7 +228,7 @@ func (rp *resticProvider) RunRestore(
stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater)
log.Infof("Run command=%v, stdout=%s, stderr=%s", restoreCmd, stdout, stderr)
return err
return 0, err
}
func (rp *resticProvider) parseRestoreExtraFlags(uploaderCfg map[string]string) ([]string, error) {

View File

@ -150,9 +150,9 @@ func TestResticRunBackup(t *testing.T) {
}
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, &updater)
_, _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, &updater)
} else {
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, nil)
_, _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)
@ -223,9 +223,9 @@ func TestResticRunRestore(t *testing.T) {
var err error
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, &updater)
_, err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, &updater)
} else {
err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, nil)
_, err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)