diff --git a/pkg/apis/velero/v1/pod_volume_operation_progress.go b/pkg/apis/velero/v1/pod_volume_operation_progress.go index 4461a7e7a..ceb67a87e 100644 --- a/pkg/apis/velero/v1/pod_volume_operation_progress.go +++ b/pkg/apis/velero/v1/pod_volume_operation_progress.go @@ -16,13 +16,6 @@ limitations under the License. package v1 -import ( - "context" - - "github.com/sirupsen/logrus" - "sigs.k8s.io/controller-runtime/pkg/client" -) - // PodVolumeOperationProgress represents the progress of a // PodVolumeBackup/Restore (restic) operation type PodVolumeOperationProgress struct { @@ -32,51 +25,3 @@ type PodVolumeOperationProgress struct { // +optional BytesDone int64 `json:"bytesDone,omitempty"` } - -type BackupProgressUpdater struct { - pvb *PodVolumeBackup - log logrus.FieldLogger - ctx context.Context - cli client.Client -} - -type RestoreProgressUpdater struct { - pvr *PodVolumeRestore - log logrus.FieldLogger - ctx context.Context - cli client.Client -} - -func NewBackupProgressUpdater(pvb *PodVolumeBackup, log logrus.FieldLogger, ctx context.Context, cli client.Client) *BackupProgressUpdater { - return &BackupProgressUpdater{pvb, log, ctx, cli} -} - -//UpdateProgress which implement ProgressUpdater to update pvb progress status -func (b *BackupProgressUpdater) UpdateProgress(p *UploaderProgress) { - original := b.pvb.DeepCopy() - b.pvb.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} - if b.cli == nil { - b.log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume) - return - } - if err := b.cli.Patch(b.ctx, b.pvb, client.MergeFrom(original)); err != nil { - b.log.Errorf("update backup pod %s volume %s progress with %v", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume, err) - } -} - -func NewRestoreProgressUpdater(pvr *PodVolumeRestore, log logrus.FieldLogger, ctx context.Context, cli client.Client) *RestoreProgressUpdater { - return &RestoreProgressUpdater{pvr, log, ctx, cli} -} - -//UpdateProgress which implement ProgressUpdater to update update pvb progress status -func (r *RestoreProgressUpdater) UpdateProgress(p *UploaderProgress) { - original := r.pvr.DeepCopy() - r.pvr.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} - if r.cli == nil { - r.log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume) - return - } - if err := r.cli.Patch(r.ctx, r.pvr, client.MergeFrom(original)); err != nil { - r.log.Errorf("update restore pod %s volume %s progress with %v", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume, err) - } -} diff --git a/pkg/apis/velero/v1/progress.go b/pkg/apis/velero/v1/progress.go deleted file mode 100644 index cf5d42b1d..000000000 --- a/pkg/apis/velero/v1/progress.go +++ /dev/null @@ -1,26 +0,0 @@ -/* -Copyright The Velero Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1 - -type UploaderProgress struct { - TotalBytes int64 `json:"totalBytes,omitempty"` - BytesDone int64 `json:"doneBytes,omitempty"` -} - -type ProgressUpdater interface { - UpdateProgress(p *UploaderProgress) -} diff --git a/pkg/cmd/util/output/backup_describer.go b/pkg/cmd/util/output/backup_describer.go index 1ec092831..2bc742c9f 100644 --- a/pkg/cmd/util/output/backup_describer.go +++ b/pkg/cmd/util/output/backup_describer.go @@ -486,7 +486,7 @@ func (v *volumesByPod) Add(namespace, name, volume, phase string, progress veler key := fmt.Sprintf("%s/%s", namespace, name) // append backup progress percentage if backup is in progress - if phase == "In Progress" && progress != (velerov1api.PodVolumeOperationProgress{}) { + if phase == "In Progress" && progress.TotalBytes != 0 { volume = fmt.Sprintf("%s (%.2f%%)", volume, float64(progress.BytesDone)/float64(progress.TotalBytes)*100) } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 3b2118926..635ef00a7 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -38,6 +38,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/metrics" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) @@ -61,6 +62,13 @@ type PodVolumeBackupReconciler struct { Log logrus.FieldLogger } +type BackupProgressUpdater struct { + PodVolumeBackup *velerov1api.PodVolumeBackup + Log logrus.FieldLogger + Ctx context.Context + Cli client.Client +} + // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups/status,verbs=get;update;patch func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -264,6 +272,18 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l return mostRecentPVB.Status.SnapshotID } +// updateBackupProgressFunc returns a func that takes progress info and patches +// the PVB with the new progress. +func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { + return func(progress velerov1api.PodVolumeOperationProgress) { + original := pvb.DeepCopy() + pvb.Status.Progress = progress + if err := r.Client.Patch(context.Background(), pvb, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error update progress") + } + } +} + func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed @@ -352,3 +372,20 @@ func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log return cmd, nil } + +func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger, ctx context.Context) *BackupProgressUpdater { + return &BackupProgressUpdater{pvb, log, ctx, r.Client} +} + +//UpdateProgress which implement ProgressUpdater interface to update pvb progress status +func (b *BackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) { + original := b.PodVolumeBackup.DeepCopy() + b.PodVolumeBackup.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} + if b.Cli == nil { + b.Log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume) + return + } + if err := b.Cli.Patch(b.Ctx, b.PodVolumeBackup, client.MergeFrom(original)); err != nil { + b.Log.Errorf("update backup pod %s volume %s progress with %v", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume, err) + } +} diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 70cf1a53d..00853710f 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -41,6 +41,7 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/boolptr" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" @@ -64,6 +65,13 @@ type PodVolumeRestoreReconciler struct { clock clock.Clock } +type RestoreProgressUpdater struct { + PodVolumeRestore *velerov1api.PodVolumeRestore + Log logrus.FieldLogger + Ctx context.Context + Cli client.Client +} + // +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch // +kubebuilder:rbac:groups="",resources=pods,verbs=get @@ -317,3 +325,32 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve return nil } + +// updateRestoreProgressFunc returns a func that takes progress info and patches +// the PVR with the new progress +func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { + return func(progress velerov1api.PodVolumeOperationProgress) { + original := req.DeepCopy() + req.Status.Progress = progress + if err := c.Patch(context.Background(), req, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Unable to update PodVolumeRestore progress") + } + } +} + +func (r *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger, ctx context.Context) *RestoreProgressUpdater { + return &RestoreProgressUpdater{pvr, log, ctx, r.Client} +} + +//UpdateProgress which implement ProgressUpdater interface to update pvr progress status +func (r *RestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) { + original := r.PodVolumeRestore.DeepCopy() + r.PodVolumeRestore.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} + if r.Cli == nil { + r.Log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.PodVolumeRestore.Spec.Pod.Name, r.PodVolumeRestore.Spec.Volume) + return + } + if err := r.Cli.Patch(r.Ctx, r.PodVolumeRestore, client.MergeFrom(original)); err != nil { + r.Log.Errorf("update restore pod %s volume %s progress with %v", r.PodVolumeRestore.Spec.Pod.Name, r.PodVolumeRestore.Spec.Volume, err) + } +} diff --git a/pkg/uploader/kopia/progress.go b/pkg/uploader/kopia/progress.go index 7da518eb5..d76818126 100644 --- a/pkg/uploader/kopia/progress.go +++ b/pkg/uploader/kopia/progress.go @@ -20,7 +20,7 @@ import ( "sync/atomic" "time" - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/uploader" ) //Throttle throttles controlle the interval of output result @@ -60,9 +60,9 @@ type KopiaProgress struct { estimatedFileCount int32 // +checklocksignore the total count of files to be processed estimatedTotalBytes int64 // +checklocksignore the total size of files to be processed // +checkatomic - processedBytes int64 // which statistic all bytes has been processed currently - outputThrottle Throttle // which control the frequency of update progress - Updater velerov1api.ProgressUpdater //which the kopia progress will call the UpdateProgress, the third party will implement the interface to update progress + processedBytes int64 // which statistic all bytes has been processed currently + outputThrottle Throttle // which control the frequency of update progress + Updater uploader.ProgressUpdater //which kopia progress will call the UpdateProgress interface, the third party will implement the interface to do the progress update } //UploadedBytes the total bytes has uploaded currently @@ -90,10 +90,10 @@ func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) { p.UpdateProgress() } -//UpdateProgress which called by UpdateProgress func, it is used to update pvb or pvr status +//UpdateProgress which calls Updater UpdateProgress interface, update progress by third-party implementation func (p *KopiaProgress) UpdateProgress() { if p.outputThrottle.ShouldOutput() { - p.Updater.UpdateProgress(&velerov1api.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes}) + p.Updater.UpdateProgress(&uploader.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes}) } } diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 5b6e338b4..c3ab2d10e 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -18,7 +18,6 @@ package kopia import ( "context" - "fmt" "math" "os" "path/filepath" @@ -28,7 +27,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service" + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/kopia/kopia/fs" @@ -86,7 +85,7 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { if fsUploader == nil { - return nil, fmt.Errorf("get empty kopia uploader") + return nil, errors.New("get empty kopia uploader") } dir, err := filepath.Abs(sourcePath) if err != nil { @@ -94,9 +93,11 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep } sourceInfo := snapshot.SourceInfo{ - Path: filepath.Clean(dir), + UserName: udmrepo.GetRepoUser(), + Host: udmrepo.GetRepoDomain(), + Path: filepath.Clean(dir), } - sourceInfo.UserName, sourceInfo.Host = service.GetRepoUser() + rootDir, err := getLocalFSEntry(sourceInfo.Path) if err != nil { return nil, errors.Wrap(err, "Unable to get local filesystem entry") diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 3a60658c9..a5b6d81fb 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/mock" repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks" - "github.com/vmware-tanzu/velero/pkg/uploader" uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks" ) @@ -187,7 +186,7 @@ func TestSnapshotSource(t *testing.T) { t.Run(tc.name, func(t *testing.T) { s := InjectSnapshotFuncs() MockFuncs(s, tc.args) - _, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource", func(up uploader.UploaderProgress) {}) + _, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource") if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 6bdd1a446..6384142ef 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -20,12 +20,12 @@ import ( "context" "fmt" "strings" - "time" "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/kopia" "github.com/vmware-tanzu/velero/internal/credentials" @@ -40,20 +40,17 @@ var BackupFunc = kopia.Backup var RestoreFunc = kopia.Restore //kopiaProvider recorded info related with kopiaProvider -//action which means provider handle backup or restore type kopiaProvider struct { - bkRepo udmrepo.BackupRepo - credGetter *credentials.CredentialGetter - uploader *snapshotfs.Uploader - restoreCancel chan struct{} - log logrus.FieldLogger + bkRepo udmrepo.BackupRepo + credGetter *credentials.CredentialGetter + log logrus.FieldLogger } //NewKopiaUploaderProvider initialized with open or create a repository func NewKopiaUploaderProvider( ctx context.Context, credGetter *credentials.CredentialGetter, - bsl *velerov1api.BackupStorageLocation, + backupRepo *velerov1api.BackupRepository, log logrus.FieldLogger, ) (Provider, error) { kp := &kopiaProvider{ @@ -61,7 +58,7 @@ func NewKopiaUploaderProvider( credGetter: credGetter, } //repoUID which is used to generate kopia repository config with unique directory path - repoUID := string(bsl.GetUID()) + repoUID := string(backupRepo.GetUID()) repoOpt, err := udmrepo.NewRepoOptions( udmrepo.WithPassword(kp, ""), udmrepo.WithConfigFile("", repoUID), @@ -81,24 +78,22 @@ func NewKopiaUploaderProvider( return kp, nil } -//CheckContext check context status periodically -//check if context is timeout or cancel -func (kp *kopiaProvider) CheckContext(ctx context.Context) { - for { - select { - case <-ctx.Done(): - if kp.uploader != nil { - kp.uploader.Cancel() - kp.log.Infof("Backup is been canceled") - } - if kp.restoreCancel != nil { - close(kp.restoreCancel) - kp.log.Infof("Restore is been canceled") - } - return - default: - time.Sleep(time.Second * 10) +//CheckContext check context status check if context is timeout or cancel and backup restore once finished it will quit and return +func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struct{}, restoreChan chan struct{}, uploader *snapshotfs.Uploader) { + select { + case <-finishChan: + kp.log.Infof("Action finished") + return + case <-ctx.Done(): + if uploader != nil { + uploader.Cancel() + kp.log.Infof("Backup is been canceled") } + if restoreChan != nil { + close(restoreChan) + kp.log.Infof("Restore is been canceled") + } + return } } @@ -106,15 +101,15 @@ func (kp *kopiaProvider) Close(ctx context.Context) { kp.bkRepo.Close(ctx) } -//RunBackup which will backup specific path and update backup progress in pvb status +//RunBackup which will backup specific path and update backup progress func (kp *kopiaProvider) RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater velerov1api.ProgressUpdater) (string, error) { + updater uploader.ProgressUpdater) (string, error) { if updater == nil { - return "", errors.New("Need to inital backup progress updater first") + return "", errors.New("Need to initial backup progress updater first") } log := kp.log.WithFields(logrus.Fields{ @@ -122,25 +117,29 @@ func (kp *kopiaProvider) RunBackup( "parentSnapshot": parentSnapshot, }) repoWriter := kopia.NewShimRepo(kp.bkRepo) - kp.uploader = snapshotfs.NewUploader(repoWriter) + kpUploader := snapshotfs.NewUploader(repoWriter) prorgess := new(kopia.KopiaProgress) prorgess.InitThrottle(backupProgressCheckInterval) prorgess.Updater = updater - kp.uploader.Progress = prorgess - + kpUploader.Progress = prorgess + quit := make(chan struct{}) log.Info("Starting backup") - go kp.CheckContext(ctx) + go kp.CheckContext(ctx, quit, nil, kpUploader) - snapshotInfo, err := BackupFunc(ctx, kp.uploader, repoWriter, path, parentSnapshot, log) + defer func() { + close(quit) + }() + + snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log) if err != nil { return "", errors.Wrapf(err, "Failed to run kopia backup") } else if snapshotInfo == nil { return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) } - + // which ensure that the statistic data of TotalBytes equal to BytesDone when finished updater.UpdateProgress( - &velerov1api.UploaderProgress{ + &uploader.UploaderProgress{ TotalBytes: snapshotInfo.Size, BytesDone: snapshotInfo.Size, }, @@ -162,12 +161,12 @@ func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) { return strings.TrimSpace(rawPass), nil } -//RunRestore which will restore specific path and update restore progress in pvr status +//RunRestore which will restore specific path and update restore progress func (kp *kopiaProvider) RunRestore( ctx context.Context, snapshotID string, volumePath string, - updater velerov1api.ProgressUpdater) error { + updater uploader.ProgressUpdater) error { log := kp.log.WithFields(logrus.Fields{ "snapshotID": snapshotID, "volumePath": volumePath, @@ -176,22 +175,27 @@ func (kp *kopiaProvider) RunRestore( prorgess := new(kopia.KopiaProgress) prorgess.InitThrottle(restoreProgressCheckInterval) prorgess.Updater = updater - kp.restoreCancel = make(chan struct{}) - defer func() { - if kp.restoreCancel != nil { - close(kp.restoreCancel) - } - }() + restoreCancel := make(chan struct{}) + quit := make(chan struct{}) log.Info("Starting restore") - go kp.CheckContext(ctx) - size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, kp.restoreCancel) + go kp.CheckContext(ctx, quit, restoreCancel, nil) + + defer func() { + if restoreCancel != nil { + close(restoreCancel) + } + close(quit) + }() + + size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, restoreCancel) if err != nil { return errors.Wrapf(err, "Failed to run kopia restore") } - updater.UpdateProgress(&velerov1api.UploaderProgress{ + // which ensure that the statistic data of TotalBytes equal to BytesDone when finished + updater.UpdateProgress(&uploader.UploaderProgress{ TotalBytes: size, BytesDone: size, }) diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 3656053d0..747acbfb4 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/controller" "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/kopia" @@ -36,8 +37,7 @@ import ( func TestRunBackup(t *testing.T) { var kp kopiaProvider kp.log = logrus.New() - fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme) - updater := velerov1api.NewBackupProgressUpdater(&velerov1api.PodVolumeBackup{}, kp.log, context.Background(), fakeClient) + updater := controller.BackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} testCases := []struct { name string hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) @@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { BackupFunc = tc.hookBackupFunc - _, err := kp.RunBackup(context.Background(), "var", nil, "", updater) + _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater) if tc.notError { assert.NoError(t, err) } else { @@ -81,7 +81,7 @@ func TestRunBackup(t *testing.T) { func TestRunRestore(t *testing.T) { var kp kopiaProvider kp.log = logrus.New() - updater := velerov1api.NewRestoreProgressUpdater(&velerov1api.PodVolumeRestore{}, kp.log, context.Background(), fake.NewFakeClientWithScheme(scheme.Scheme)) + updater := controller.RestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} testCases := []struct { name string @@ -107,7 +107,7 @@ func TestRunRestore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { RestoreFunc = tc.hookRestoreFunc - err := kp.RunRestore(context.Background(), "", "/var", updater) + err := kp.RunRestore(context.Background(), "", "/var", &updater) if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index c6fe0a177..b435708f6 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -34,37 +34,38 @@ const backupProgressCheckInterval = 10 * time.Second // Provider which is designed for one pod volumn to do the backup or restore type Provider interface { // RunBackup which will do backup for one specific volumn and return snapshotID error - // updater which is used for update backup progress into related pvb status + // updater is used for updating backup progress which implement by third-party RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater velerov1api.ProgressUpdater) (string, error) + updater uploader.ProgressUpdater) (string, error) // RunRestore which will do restore for one specific volumn with given snapshot id and return error - // updateFunc which is used for update restore progress into related pvr status + // updater is used for updating backup progress which implement by third-party RunRestore( ctx context.Context, snapshotID string, volumePath string, - updater velerov1api.ProgressUpdater) error + updater uploader.ProgressUpdater) error // Close which will close related repository Close(ctx context.Context) } -//NewUploaderProvider initialize provider with specific uploader_type +// NewUploaderProvider initialize provider with specific uploaderType func NewUploaderProvider( ctx context.Context, - uploader_type string, + uploaderType string, repoIdentifier string, bsl *velerov1api.BackupStorageLocation, + backupReo *velerov1api.BackupRepository, credGetter *credentials.CredentialGetter, repoKeySelector *v1.SecretKeySelector, log logrus.FieldLogger, ) (Provider, error) { - if uploader_type == uploader.KopiaType { + if uploaderType == uploader.KopiaType { return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) } else { - return NewKopiaUploaderProvider(ctx, credGetter, bsl, log) + return NewKopiaUploaderProvider(ctx, credGetter, backupReo, log) } } diff --git a/pkg/uploader/types.go b/pkg/uploader/types.go index 4f5274a97..015a2c156 100644 --- a/pkg/uploader/types.go +++ b/pkg/uploader/types.go @@ -40,3 +40,14 @@ type SnapshotInfo struct { ID string `json:"id"` Size int64 `json:"Size"` } + +//UploaderProgress which defined two variables to record progress +type UploaderProgress struct { + TotalBytes int64 `json:"totalBytes,omitempty"` + BytesDone int64 `json:"doneBytes,omitempty"` +} + +//UploaderProgress which defined generic interface to update progress +type ProgressUpdater interface { + UpdateProgress(p *UploaderProgress) +}