Fix snapshot leak for backup
Signed-off-by: Ming Qiu <ming.qiu@broadcom.com>pull/7558/head
parent
3d6dab0708
commit
3d5282e12b
|
@ -0,0 +1 @@
|
|||
Fix snapshot leak for backup
|
|
@ -412,8 +412,8 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
|
|||
}
|
||||
|
||||
if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i],
|
||||
fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
|
||||
time.Now(), s.logger); err != nil {
|
||||
fmt.Errorf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
|
||||
"", time.Now(), s.logger); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName())
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -401,7 +401,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
|
|||
}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
|
||||
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) {
|
||||
defer r.closeDataPath(ctx, duName)
|
||||
|
||||
log := r.logger.WithField("dataupload", duName)
|
||||
|
@ -698,6 +698,9 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
|
|||
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
}
|
||||
|
||||
if dataPathError, ok := err.(datapath.DataPathError); ok {
|
||||
du.Status.SnapshotID = dataPathError.GetSnapshotID()
|
||||
}
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataUpload status")
|
||||
|
|
|
@ -124,6 +124,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
|||
}
|
||||
|
||||
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, pVBRRequestor, ctx, r.Client, pvb.Namespace, callbacks, log)
|
||||
|
||||
if err != nil {
|
||||
if err == datapath.ConcurrentLimitExceed {
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
|
||||
|
@ -225,7 +226,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam
|
|||
log.Info("PodVolumeBackup completed")
|
||||
}
|
||||
|
||||
func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) {
|
||||
func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) {
|
||||
defer r.closeDataPath(ctx, pvbName)
|
||||
|
||||
log := r.logger.WithField("pvb", pvbName)
|
||||
|
@ -348,17 +349,19 @@ func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName s
|
|||
|
||||
func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
|
||||
r.closeDataPath(ctx, pvb.Name)
|
||||
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.clock.Now(), log)
|
||||
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, err, msg, r.clock.Now(), log)
|
||||
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time, log logrus.FieldLogger) error {
|
||||
func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errOut error, msg string, time time.Time, log logrus.FieldLogger) error {
|
||||
original := pvb.DeepCopy()
|
||||
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
|
||||
pvb.Status.Message = errString
|
||||
pvb.Status.CompletionTimestamp = &metav1.Time{Time: time}
|
||||
|
||||
if dataPathError, ok := errOut.(datapath.DataPathError); ok {
|
||||
pvb.Status.SnapshotID = dataPathError.GetSnapshotID()
|
||||
}
|
||||
pvb.Status.Message = errors.WithMessage(errOut, msg).Error()
|
||||
err := c.Patch(ctx, pvb, client.MergeFrom(original))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error updating PodVolumeBackup status")
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package datapath
|
||||
|
||||
// DataPathError represents an error that occurred during a backup or restore operation
|
||||
type DataPathError struct {
|
||||
snapshotID string
|
||||
err error
|
||||
}
|
||||
|
||||
// Error implements error.
|
||||
func (e DataPathError) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// GetSnapshotID returns the snapshot ID for the error.
|
||||
func (e DataPathError) GetSnapshotID() string {
|
||||
return e.snapshotID
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package datapath
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetSnapshotID(t *testing.T) {
|
||||
// Create a DataPathError instance for testing
|
||||
err := DataPathError{snapshotID: "123", err: errors.New("example error")}
|
||||
// Call the GetSnapshotID method to retrieve the snapshot ID
|
||||
snapshotID := err.GetSnapshotID()
|
||||
// Check if the retrieved snapshot ID matches the expected value
|
||||
if snapshotID != "123" {
|
||||
t.Errorf("GetSnapshotID() returned unexpected snapshot ID: got %s, want %s", snapshotID, "123")
|
||||
}
|
||||
}
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
// Create a DataPathError instance for testing
|
||||
err := DataPathError{snapshotID: "123", err: errors.New("example error")}
|
||||
// Call the Error method to retrieve the error message
|
||||
errMsg := err.Error()
|
||||
// Check if the retrieved error message matches the expected value
|
||||
expectedErrMsg := "example error"
|
||||
if errMsg != expectedErrMsg {
|
||||
t.Errorf("Error() returned unexpected error message: got %s, want %s", errMsg, expectedErrMsg)
|
||||
}
|
||||
}
|
|
@ -141,7 +141,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
|
|||
if err == provider.ErrorCanceled {
|
||||
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
|
||||
} else if err != nil {
|
||||
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
|
||||
dataPathErr := DataPathError{
|
||||
snapshotID: snapshotID,
|
||||
err: err,
|
||||
}
|
||||
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
|
||||
} else {
|
||||
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
|
||||
}
|
||||
|
@ -161,7 +165,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
|
|||
if err == provider.ErrorCanceled {
|
||||
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
|
||||
} else if err != nil {
|
||||
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
|
||||
dataPathErr := DataPathError{
|
||||
snapshotID: snapshotID,
|
||||
err: err,
|
||||
}
|
||||
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
|
||||
} else {
|
||||
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ func TestAsyncBackup(t *testing.T) {
|
|||
var asyncErr error
|
||||
var asyncResult Result
|
||||
finish := make(chan struct{})
|
||||
|
||||
var failErr = errors.New("fake-fail-error")
|
||||
tests := []struct {
|
||||
name string
|
||||
uploaderProv provider.Provider
|
||||
|
@ -49,12 +49,12 @@ func TestAsyncBackup(t *testing.T) {
|
|||
OnCompleted: nil,
|
||||
OnCancelled: nil,
|
||||
OnFailed: func(ctx context.Context, namespace string, job string, err error) {
|
||||
asyncErr = err
|
||||
asyncErr = failErr
|
||||
asyncResult = Result{}
|
||||
finish <- struct{}{}
|
||||
},
|
||||
},
|
||||
err: errors.New("fake-error"),
|
||||
err: failErr,
|
||||
},
|
||||
{
|
||||
name: "async backup cancel",
|
||||
|
@ -117,7 +117,7 @@ func TestAsyncRestore(t *testing.T) {
|
|||
var asyncErr error
|
||||
var asyncResult Result
|
||||
finish := make(chan struct{})
|
||||
|
||||
var failErr = errors.New("fake-fail-error")
|
||||
tests := []struct {
|
||||
name string
|
||||
uploaderProv provider.Provider
|
||||
|
@ -133,12 +133,12 @@ func TestAsyncRestore(t *testing.T) {
|
|||
OnCompleted: nil,
|
||||
OnCancelled: nil,
|
||||
OnFailed: func(ctx context.Context, namespace string, job string, err error) {
|
||||
asyncErr = err
|
||||
asyncErr = failErr
|
||||
asyncResult = Result{}
|
||||
finish <- struct{}{}
|
||||
},
|
||||
},
|
||||
err: errors.New("fake-error"),
|
||||
err: failErr,
|
||||
},
|
||||
{
|
||||
name: "async restore cancel",
|
||||
|
|
|
@ -55,6 +55,7 @@ var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
|
|||
var restoreEntryFunc = restore.Entry
|
||||
|
||||
const UploaderConfigMultipartKey = "uploader-multipart"
|
||||
const MaxErrorReported = 10
|
||||
|
||||
// SnapshotUploader which mainly used for UT test that could overwrite Upload interface
|
||||
type SnapshotUploader interface {
|
||||
|
@ -182,17 +183,14 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
|
|||
}
|
||||
|
||||
kopiaCtx := kopia.SetupKopiaLog(ctx, log)
|
||||
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
|
||||
snapshotInfo := &uploader.SnapshotInfo{
|
||||
ID: snapID,
|
||||
Size: snapshotSize,
|
||||
}
|
||||
|
||||
return snapshotInfo, false, nil
|
||||
return snapshotInfo, false, err
|
||||
}
|
||||
|
||||
func getLocalFSEntry(path0 string) (fs.Entry, error) {
|
||||
|
@ -307,6 +305,10 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
|
|||
var errs []string
|
||||
if ds := manifest.RootEntry.DirSummary; ds != nil {
|
||||
for _, ent := range ds.FailedEntries {
|
||||
if len(errs) > MaxErrorReported {
|
||||
errs = append(errs, "too many errors, ignored...")
|
||||
break
|
||||
}
|
||||
policy := policyTree.EffectivePolicy()
|
||||
if !(policy != nil && bool(*policy.ErrorHandlingPolicy.IgnoreUnknownTypes) && strings.Contains(ent.Error, fs.ErrUnknown.Error())) {
|
||||
errs = append(errs, fmt.Sprintf("Error when processing %v: %v", ent.EntryPath, ent.Error))
|
||||
|
@ -315,7 +317,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
|
|||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
return "", 0, errors.New(strings.Join(errs, "\n"))
|
||||
return string(manifestID), snapSize, errors.New(strings.Join(errs, "\n"))
|
||||
}
|
||||
|
||||
return string(manifestID), snapSize, nil
|
||||
|
|
|
@ -227,8 +227,8 @@ func TestReportSnapshotStatus(t *testing.T) {
|
|||
},
|
||||
{
|
||||
shouldError: true,
|
||||
expectedResult: "",
|
||||
expectedSize: 0,
|
||||
expectedResult: "sample-manifest-id",
|
||||
expectedSize: 1024,
|
||||
directorySummary: &fs.DirectorySummary{
|
||||
FailedEntries: []*fs.EntryWithError{
|
||||
{
|
||||
|
|
|
@ -141,6 +141,7 @@ func (kp *kopiaProvider) RunBackup(
|
|||
progress.Updater = updater
|
||||
progress.Log = log
|
||||
kpUploader.Progress = progress
|
||||
kpUploader.FailFast = true
|
||||
quit := make(chan struct{})
|
||||
log.Info("Starting backup")
|
||||
go kp.CheckContext(ctx, quit, nil, kpUploader)
|
||||
|
@ -167,19 +168,20 @@ func (kp *kopiaProvider) RunBackup(
|
|||
uploaderCfg[kopia.UploaderConfigMultipartKey] = "true"
|
||||
}
|
||||
|
||||
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
|
||||
snapshotInfo, _, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
|
||||
if err != nil {
|
||||
if kpUploader.IsCanceled() {
|
||||
log.Error("Kopia backup is canceled")
|
||||
return "", false, ErrorCanceled
|
||||
snapshotID := ""
|
||||
if snapshotInfo != nil {
|
||||
snapshotID = snapshotInfo.ID
|
||||
} else {
|
||||
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
|
||||
log.Infof("Kopia backup failed with %v and get empty snapshot ID", err)
|
||||
}
|
||||
} else if isSnapshotEmpty {
|
||||
log.Debugf("Kopia backup got empty dir with path %s", path)
|
||||
return "", true, nil
|
||||
} else if snapshotInfo == nil {
|
||||
return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
|
||||
|
||||
if kpUploader.IsCanceled() {
|
||||
log.Warn("Kopia backup is canceled")
|
||||
return snapshotID, false, ErrorCanceled
|
||||
}
|
||||
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
|
||||
}
|
||||
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
|
|
|
@ -90,13 +90,6 @@ func TestRunBackup(t *testing.T) {
|
|||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "got empty snapshot",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
|
||||
return nil, true, errors.New("snapshot is empty")
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "success to backup block mode volume",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
|
||||
|
|
Loading…
Reference in New Issue