parent
839c2ed98f
commit
2bf054ad0b
|
@ -16,13 +16,6 @@ limitations under the License.
|
|||
|
||||
package v1
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// PodVolumeOperationProgress represents the progress of a
|
||||
// PodVolumeBackup/Restore (restic) operation
|
||||
type PodVolumeOperationProgress struct {
|
||||
|
@ -32,51 +25,3 @@ type PodVolumeOperationProgress struct {
|
|||
// +optional
|
||||
BytesDone int64 `json:"bytesDone,omitempty"`
|
||||
}
|
||||
|
||||
type BackupProgressUpdater struct {
|
||||
pvb *PodVolumeBackup
|
||||
log logrus.FieldLogger
|
||||
ctx context.Context
|
||||
cli client.Client
|
||||
}
|
||||
|
||||
type RestoreProgressUpdater struct {
|
||||
pvr *PodVolumeRestore
|
||||
log logrus.FieldLogger
|
||||
ctx context.Context
|
||||
cli client.Client
|
||||
}
|
||||
|
||||
func NewBackupProgressUpdater(pvb *PodVolumeBackup, log logrus.FieldLogger, ctx context.Context, cli client.Client) *BackupProgressUpdater {
|
||||
return &BackupProgressUpdater{pvb, log, ctx, cli}
|
||||
}
|
||||
|
||||
//UpdateProgress which implement ProgressUpdater to update pvb progress status
|
||||
func (b *BackupProgressUpdater) UpdateProgress(p *UploaderProgress) {
|
||||
original := b.pvb.DeepCopy()
|
||||
b.pvb.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if b.cli == nil {
|
||||
b.log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume)
|
||||
return
|
||||
}
|
||||
if err := b.cli.Patch(b.ctx, b.pvb, client.MergeFrom(original)); err != nil {
|
||||
b.log.Errorf("update backup pod %s volume %s progress with %v", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume, err)
|
||||
}
|
||||
}
|
||||
|
||||
func NewRestoreProgressUpdater(pvr *PodVolumeRestore, log logrus.FieldLogger, ctx context.Context, cli client.Client) *RestoreProgressUpdater {
|
||||
return &RestoreProgressUpdater{pvr, log, ctx, cli}
|
||||
}
|
||||
|
||||
//UpdateProgress which implement ProgressUpdater to update update pvb progress status
|
||||
func (r *RestoreProgressUpdater) UpdateProgress(p *UploaderProgress) {
|
||||
original := r.pvr.DeepCopy()
|
||||
r.pvr.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if r.cli == nil {
|
||||
r.log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume)
|
||||
return
|
||||
}
|
||||
if err := r.cli.Patch(r.ctx, r.pvr, client.MergeFrom(original)); err != nil {
|
||||
r.log.Errorf("update restore pod %s volume %s progress with %v", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1
|
||||
|
||||
type UploaderProgress struct {
|
||||
TotalBytes int64 `json:"totalBytes,omitempty"`
|
||||
BytesDone int64 `json:"doneBytes,omitempty"`
|
||||
}
|
||||
|
||||
type ProgressUpdater interface {
|
||||
UpdateProgress(p *UploaderProgress)
|
||||
}
|
|
@ -486,7 +486,7 @@ func (v *volumesByPod) Add(namespace, name, volume, phase string, progress veler
|
|||
key := fmt.Sprintf("%s/%s", namespace, name)
|
||||
|
||||
// append backup progress percentage if backup is in progress
|
||||
if phase == "In Progress" && progress != (velerov1api.PodVolumeOperationProgress{}) {
|
||||
if phase == "In Progress" && progress.TotalBytes != 0 {
|
||||
volume = fmt.Sprintf("%s (%.2f%%)", volume, float64(progress.BytesDone)/float64(progress.TotalBytes)*100)
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
@ -61,6 +62,13 @@ type PodVolumeBackupReconciler struct {
|
|||
Log logrus.FieldLogger
|
||||
}
|
||||
|
||||
type BackupProgressUpdater struct {
|
||||
PodVolumeBackup *velerov1api.PodVolumeBackup
|
||||
Log logrus.FieldLogger
|
||||
Ctx context.Context
|
||||
Cli client.Client
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups/status,verbs=get;update;patch
|
||||
func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
|
@ -264,6 +272,18 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l
|
|||
return mostRecentPVB.Status.SnapshotID
|
||||
}
|
||||
|
||||
// updateBackupProgressFunc returns a func that takes progress info and patches
|
||||
// the PVB with the new progress.
|
||||
func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
|
||||
return func(progress velerov1api.PodVolumeOperationProgress) {
|
||||
original := pvb.DeepCopy()
|
||||
pvb.Status.Progress = progress
|
||||
if err := r.Client.Patch(context.Background(), pvb, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error update progress")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
|
||||
original := pvb.DeepCopy()
|
||||
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
|
||||
|
@ -352,3 +372,20 @@ func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log
|
|||
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger, ctx context.Context) *BackupProgressUpdater {
|
||||
return &BackupProgressUpdater{pvb, log, ctx, r.Client}
|
||||
}
|
||||
|
||||
//UpdateProgress which implement ProgressUpdater interface to update pvb progress status
|
||||
func (b *BackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {
|
||||
original := b.PodVolumeBackup.DeepCopy()
|
||||
b.PodVolumeBackup.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if b.Cli == nil {
|
||||
b.Log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume)
|
||||
return
|
||||
}
|
||||
if err := b.Cli.Patch(b.Ctx, b.PodVolumeBackup, client.MergeFrom(original)); err != nil {
|
||||
b.Log.Errorf("update backup pod %s volume %s progress with %v", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
@ -64,6 +65,13 @@ type PodVolumeRestoreReconciler struct {
|
|||
clock clock.Clock
|
||||
}
|
||||
|
||||
type RestoreProgressUpdater struct {
|
||||
PodVolumeRestore *velerov1api.PodVolumeRestore
|
||||
Log logrus.FieldLogger
|
||||
Ctx context.Context
|
||||
Cli client.Client
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
|
||||
|
@ -317,3 +325,32 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateRestoreProgressFunc returns a func that takes progress info and patches
|
||||
// the PVR with the new progress
|
||||
func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
|
||||
return func(progress velerov1api.PodVolumeOperationProgress) {
|
||||
original := req.DeepCopy()
|
||||
req.Status.Progress = progress
|
||||
if err := c.Patch(context.Background(), req, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("Unable to update PodVolumeRestore progress")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger, ctx context.Context) *RestoreProgressUpdater {
|
||||
return &RestoreProgressUpdater{pvr, log, ctx, r.Client}
|
||||
}
|
||||
|
||||
//UpdateProgress which implement ProgressUpdater interface to update pvr progress status
|
||||
func (r *RestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {
|
||||
original := r.PodVolumeRestore.DeepCopy()
|
||||
r.PodVolumeRestore.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if r.Cli == nil {
|
||||
r.Log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.PodVolumeRestore.Spec.Pod.Name, r.PodVolumeRestore.Spec.Volume)
|
||||
return
|
||||
}
|
||||
if err := r.Cli.Patch(r.Ctx, r.PodVolumeRestore, client.MergeFrom(original)); err != nil {
|
||||
r.Log.Errorf("update restore pod %s volume %s progress with %v", r.PodVolumeRestore.Spec.Pod.Name, r.PodVolumeRestore.Spec.Volume, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
)
|
||||
|
||||
//Throttle throttles controlle the interval of output result
|
||||
|
@ -60,9 +60,9 @@ type KopiaProgress struct {
|
|||
estimatedFileCount int32 // +checklocksignore the total count of files to be processed
|
||||
estimatedTotalBytes int64 // +checklocksignore the total size of files to be processed
|
||||
// +checkatomic
|
||||
processedBytes int64 // which statistic all bytes has been processed currently
|
||||
outputThrottle Throttle // which control the frequency of update progress
|
||||
Updater velerov1api.ProgressUpdater //which the kopia progress will call the UpdateProgress, the third party will implement the interface to update progress
|
||||
processedBytes int64 // which statistic all bytes has been processed currently
|
||||
outputThrottle Throttle // which control the frequency of update progress
|
||||
Updater uploader.ProgressUpdater //which kopia progress will call the UpdateProgress interface, the third party will implement the interface to do the progress update
|
||||
}
|
||||
|
||||
//UploadedBytes the total bytes has uploaded currently
|
||||
|
@ -90,10 +90,10 @@ func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
|
|||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
//UpdateProgress which called by UpdateProgress func, it is used to update pvb or pvr status
|
||||
//UpdateProgress which calls Updater UpdateProgress interface, update progress by third-party implementation
|
||||
func (p *KopiaProgress) UpdateProgress() {
|
||||
if p.outputThrottle.ShouldOutput() {
|
||||
p.Updater.UpdateProgress(&velerov1api.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes})
|
||||
p.Updater.UpdateProgress(&uploader.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ package kopia
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -28,7 +27,7 @@ import (
|
|||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
|
@ -86,7 +85,7 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn
|
|||
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
|
||||
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
if fsUploader == nil {
|
||||
return nil, fmt.Errorf("get empty kopia uploader")
|
||||
return nil, errors.New("get empty kopia uploader")
|
||||
}
|
||||
dir, err := filepath.Abs(sourcePath)
|
||||
if err != nil {
|
||||
|
@ -94,9 +93,11 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
|
|||
}
|
||||
|
||||
sourceInfo := snapshot.SourceInfo{
|
||||
Path: filepath.Clean(dir),
|
||||
UserName: udmrepo.GetRepoUser(),
|
||||
Host: udmrepo.GetRepoDomain(),
|
||||
Path: filepath.Clean(dir),
|
||||
}
|
||||
sourceInfo.UserName, sourceInfo.Host = service.GetRepoUser()
|
||||
|
||||
rootDir, err := getLocalFSEntry(sourceInfo.Path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Unable to get local filesystem entry")
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
"github.com/stretchr/testify/mock"
|
||||
|
||||
repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks"
|
||||
)
|
||||
|
||||
|
@ -187,7 +186,7 @@ func TestSnapshotSource(t *testing.T) {
|
|||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := InjectSnapshotFuncs()
|
||||
MockFuncs(s, tc.args)
|
||||
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource", func(up uploader.UploaderProgress) {})
|
||||
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource")
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
|
|
|
@ -20,12 +20,12 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
|
@ -40,20 +40,17 @@ var BackupFunc = kopia.Backup
|
|||
var RestoreFunc = kopia.Restore
|
||||
|
||||
//kopiaProvider recorded info related with kopiaProvider
|
||||
//action which means provider handle backup or restore
|
||||
type kopiaProvider struct {
|
||||
bkRepo udmrepo.BackupRepo
|
||||
credGetter *credentials.CredentialGetter
|
||||
uploader *snapshotfs.Uploader
|
||||
restoreCancel chan struct{}
|
||||
log logrus.FieldLogger
|
||||
bkRepo udmrepo.BackupRepo
|
||||
credGetter *credentials.CredentialGetter
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
//NewKopiaUploaderProvider initialized with open or create a repository
|
||||
func NewKopiaUploaderProvider(
|
||||
ctx context.Context,
|
||||
credGetter *credentials.CredentialGetter,
|
||||
bsl *velerov1api.BackupStorageLocation,
|
||||
backupRepo *velerov1api.BackupRepository,
|
||||
log logrus.FieldLogger,
|
||||
) (Provider, error) {
|
||||
kp := &kopiaProvider{
|
||||
|
@ -61,7 +58,7 @@ func NewKopiaUploaderProvider(
|
|||
credGetter: credGetter,
|
||||
}
|
||||
//repoUID which is used to generate kopia repository config with unique directory path
|
||||
repoUID := string(bsl.GetUID())
|
||||
repoUID := string(backupRepo.GetUID())
|
||||
repoOpt, err := udmrepo.NewRepoOptions(
|
||||
udmrepo.WithPassword(kp, ""),
|
||||
udmrepo.WithConfigFile("", repoUID),
|
||||
|
@ -81,24 +78,22 @@ func NewKopiaUploaderProvider(
|
|||
return kp, nil
|
||||
}
|
||||
|
||||
//CheckContext check context status periodically
|
||||
//check if context is timeout or cancel
|
||||
func (kp *kopiaProvider) CheckContext(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if kp.uploader != nil {
|
||||
kp.uploader.Cancel()
|
||||
kp.log.Infof("Backup is been canceled")
|
||||
}
|
||||
if kp.restoreCancel != nil {
|
||||
close(kp.restoreCancel)
|
||||
kp.log.Infof("Restore is been canceled")
|
||||
}
|
||||
return
|
||||
default:
|
||||
time.Sleep(time.Second * 10)
|
||||
//CheckContext check context status check if context is timeout or cancel and backup restore once finished it will quit and return
|
||||
func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struct{}, restoreChan chan struct{}, uploader *snapshotfs.Uploader) {
|
||||
select {
|
||||
case <-finishChan:
|
||||
kp.log.Infof("Action finished")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
if uploader != nil {
|
||||
uploader.Cancel()
|
||||
kp.log.Infof("Backup is been canceled")
|
||||
}
|
||||
if restoreChan != nil {
|
||||
close(restoreChan)
|
||||
kp.log.Infof("Restore is been canceled")
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,15 +101,15 @@ func (kp *kopiaProvider) Close(ctx context.Context) {
|
|||
kp.bkRepo.Close(ctx)
|
||||
}
|
||||
|
||||
//RunBackup which will backup specific path and update backup progress in pvb status
|
||||
//RunBackup which will backup specific path and update backup progress
|
||||
func (kp *kopiaProvider) RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater velerov1api.ProgressUpdater) (string, error) {
|
||||
updater uploader.ProgressUpdater) (string, error) {
|
||||
if updater == nil {
|
||||
return "", errors.New("Need to inital backup progress updater first")
|
||||
return "", errors.New("Need to initial backup progress updater first")
|
||||
}
|
||||
|
||||
log := kp.log.WithFields(logrus.Fields{
|
||||
|
@ -122,25 +117,29 @@ func (kp *kopiaProvider) RunBackup(
|
|||
"parentSnapshot": parentSnapshot,
|
||||
})
|
||||
repoWriter := kopia.NewShimRepo(kp.bkRepo)
|
||||
kp.uploader = snapshotfs.NewUploader(repoWriter)
|
||||
kpUploader := snapshotfs.NewUploader(repoWriter)
|
||||
prorgess := new(kopia.KopiaProgress)
|
||||
prorgess.InitThrottle(backupProgressCheckInterval)
|
||||
prorgess.Updater = updater
|
||||
kp.uploader.Progress = prorgess
|
||||
|
||||
kpUploader.Progress = prorgess
|
||||
quit := make(chan struct{})
|
||||
log.Info("Starting backup")
|
||||
go kp.CheckContext(ctx)
|
||||
go kp.CheckContext(ctx, quit, nil, kpUploader)
|
||||
|
||||
snapshotInfo, err := BackupFunc(ctx, kp.uploader, repoWriter, path, parentSnapshot, log)
|
||||
defer func() {
|
||||
close(quit)
|
||||
}()
|
||||
|
||||
snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
|
||||
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "Failed to run kopia backup")
|
||||
} else if snapshotInfo == nil {
|
||||
return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
|
||||
}
|
||||
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(
|
||||
&velerov1api.UploaderProgress{
|
||||
&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotInfo.Size,
|
||||
BytesDone: snapshotInfo.Size,
|
||||
},
|
||||
|
@ -162,12 +161,12 @@ func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
|
|||
return strings.TrimSpace(rawPass), nil
|
||||
}
|
||||
|
||||
//RunRestore which will restore specific path and update restore progress in pvr status
|
||||
//RunRestore which will restore specific path and update restore progress
|
||||
func (kp *kopiaProvider) RunRestore(
|
||||
ctx context.Context,
|
||||
snapshotID string,
|
||||
volumePath string,
|
||||
updater velerov1api.ProgressUpdater) error {
|
||||
updater uploader.ProgressUpdater) error {
|
||||
log := kp.log.WithFields(logrus.Fields{
|
||||
"snapshotID": snapshotID,
|
||||
"volumePath": volumePath,
|
||||
|
@ -176,22 +175,27 @@ func (kp *kopiaProvider) RunRestore(
|
|||
prorgess := new(kopia.KopiaProgress)
|
||||
prorgess.InitThrottle(restoreProgressCheckInterval)
|
||||
prorgess.Updater = updater
|
||||
kp.restoreCancel = make(chan struct{})
|
||||
defer func() {
|
||||
if kp.restoreCancel != nil {
|
||||
close(kp.restoreCancel)
|
||||
}
|
||||
}()
|
||||
restoreCancel := make(chan struct{})
|
||||
quit := make(chan struct{})
|
||||
|
||||
log.Info("Starting restore")
|
||||
go kp.CheckContext(ctx)
|
||||
size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, kp.restoreCancel)
|
||||
go kp.CheckContext(ctx, quit, restoreCancel, nil)
|
||||
|
||||
defer func() {
|
||||
if restoreCancel != nil {
|
||||
close(restoreCancel)
|
||||
}
|
||||
close(quit)
|
||||
}()
|
||||
|
||||
size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, restoreCancel)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Failed to run kopia restore")
|
||||
}
|
||||
|
||||
updater.UpdateProgress(&velerov1api.UploaderProgress{
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: size,
|
||||
BytesDone: size,
|
||||
})
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/controller"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
|
||||
|
@ -36,8 +37,7 @@ import (
|
|||
func TestRunBackup(t *testing.T) {
|
||||
var kp kopiaProvider
|
||||
kp.log = logrus.New()
|
||||
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
|
||||
updater := velerov1api.NewBackupProgressUpdater(&velerov1api.PodVolumeBackup{}, kp.log, context.Background(), fakeClient)
|
||||
updater := controller.BackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error)
|
||||
|
@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
BackupFunc = tc.hookBackupFunc
|
||||
_, err := kp.RunBackup(context.Background(), "var", nil, "", updater)
|
||||
_, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
|
@ -81,7 +81,7 @@ func TestRunBackup(t *testing.T) {
|
|||
func TestRunRestore(t *testing.T) {
|
||||
var kp kopiaProvider
|
||||
kp.log = logrus.New()
|
||||
updater := velerov1api.NewRestoreProgressUpdater(&velerov1api.PodVolumeRestore{}, kp.log, context.Background(), fake.NewFakeClientWithScheme(scheme.Scheme))
|
||||
updater := controller.RestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
@ -107,7 +107,7 @@ func TestRunRestore(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
RestoreFunc = tc.hookRestoreFunc
|
||||
err := kp.RunRestore(context.Background(), "", "/var", updater)
|
||||
err := kp.RunRestore(context.Background(), "", "/var", &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
|
|
|
@ -34,37 +34,38 @@ const backupProgressCheckInterval = 10 * time.Second
|
|||
// Provider which is designed for one pod volumn to do the backup or restore
|
||||
type Provider interface {
|
||||
// RunBackup which will do backup for one specific volumn and return snapshotID error
|
||||
// updater which is used for update backup progress into related pvb status
|
||||
// updater is used for updating backup progress which implement by third-party
|
||||
RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater velerov1api.ProgressUpdater) (string, error)
|
||||
updater uploader.ProgressUpdater) (string, error)
|
||||
// RunRestore which will do restore for one specific volumn with given snapshot id and return error
|
||||
// updateFunc which is used for update restore progress into related pvr status
|
||||
// updater is used for updating backup progress which implement by third-party
|
||||
RunRestore(
|
||||
ctx context.Context,
|
||||
snapshotID string,
|
||||
volumePath string,
|
||||
updater velerov1api.ProgressUpdater) error
|
||||
updater uploader.ProgressUpdater) error
|
||||
// Close which will close related repository
|
||||
Close(ctx context.Context)
|
||||
}
|
||||
|
||||
//NewUploaderProvider initialize provider with specific uploader_type
|
||||
// NewUploaderProvider initialize provider with specific uploaderType
|
||||
func NewUploaderProvider(
|
||||
ctx context.Context,
|
||||
uploader_type string,
|
||||
uploaderType string,
|
||||
repoIdentifier string,
|
||||
bsl *velerov1api.BackupStorageLocation,
|
||||
backupReo *velerov1api.BackupRepository,
|
||||
credGetter *credentials.CredentialGetter,
|
||||
repoKeySelector *v1.SecretKeySelector,
|
||||
log logrus.FieldLogger,
|
||||
) (Provider, error) {
|
||||
if uploader_type == uploader.KopiaType {
|
||||
if uploaderType == uploader.KopiaType {
|
||||
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
|
||||
} else {
|
||||
return NewKopiaUploaderProvider(ctx, credGetter, bsl, log)
|
||||
return NewKopiaUploaderProvider(ctx, credGetter, backupReo, log)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,3 +40,14 @@ type SnapshotInfo struct {
|
|||
ID string `json:"id"`
|
||||
Size int64 `json:"Size"`
|
||||
}
|
||||
|
||||
//UploaderProgress which defined two variables to record progress
|
||||
type UploaderProgress struct {
|
||||
TotalBytes int64 `json:"totalBytes,omitempty"`
|
||||
BytesDone int64 `json:"doneBytes,omitempty"`
|
||||
}
|
||||
|
||||
//UploaderProgress which defined generic interface to update progress
|
||||
type ProgressUpdater interface {
|
||||
UpdateProgress(p *UploaderProgress)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue