Merge pull request #7554 from blackpiglet/7357_fix

Support update the backup VolumeInfos by the Async ops result.
pull/7604/head
Xun Jiang/Bruce Jiang 2024-04-01 11:05:33 +08:00 committed by GitHub
commit 75962653c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 485 additions and 49 deletions

View File

@ -0,0 +1 @@
Support update the backup VolumeInfos by the Async ops result.

View File

@ -76,6 +76,9 @@ type VolumeInfo struct {
// Snapshot starts timestamp.
StartTimestamp *metav1.Time `json:"startTimestamp,omitempty"`
// Snapshot completes timestamp.
CompletionTimestamp *metav1.Time `json:"completionTimestamp,omitempty"`
CSISnapshotInfo *CSISnapshotInfo `json:"csiSnapshotInfo,omitempty"`
SnapshotDataMovementInfo *SnapshotDataMovementInfo `json:"snapshotDataMovementInfo,omitempty"`
NativeSnapshotInfo *NativeSnapshotInfo `json:"nativeSnapshotInfo,omitempty"`
@ -119,6 +122,9 @@ type SnapshotDataMovementInfo struct {
// The Async Operation's ID.
OperationID string `json:"operationID"`
// Moved snapshot data size.
Size int64 `json:"size"`
}
// NativeSnapshotInfo is used for displaying the Velero native snapshot status.
@ -379,7 +385,6 @@ func (v *VolumesInformation) generateVolumeInfoForCSIVolumeSnapshot() {
Skipped: false,
SnapshotDataMoved: false,
PreserveLocalSnapshot: true,
StartTimestamp: &(volumeSnapshot.CreationTimestamp),
CSISnapshotInfo: &CSISnapshotInfo{
VSCName: *volumeSnapshot.Status.BoundVolumeSnapshotContentName,
Size: size,
@ -393,6 +398,10 @@ func (v *VolumesInformation) generateVolumeInfoForCSIVolumeSnapshot() {
},
}
if volumeSnapshot.Status.CreationTime != nil {
volumeInfo.StartTimestamp = volumeSnapshot.Status.CreationTime
}
tmpVolumeInfos = append(tmpVolumeInfos, volumeInfo)
} else {
v.logger.Warnf("cannot find info for PVC %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Spec.Source.PersistentVolumeClaimName)
@ -412,7 +421,6 @@ func (v *VolumesInformation) generateVolumeInfoFromPVB() {
BackupMethod: PodVolumeBackup,
SnapshotDataMoved: false,
Skipped: false,
StartTimestamp: pvb.Status.StartTimestamp,
PVBInfo: &PodVolumeBackupInfo{
SnapshotHandle: pvb.Status.SnapshotID,
Size: pvb.Status.Progress.TotalBytes,
@ -424,6 +432,14 @@ func (v *VolumesInformation) generateVolumeInfoFromPVB() {
},
}
if pvb.Status.StartTimestamp != nil {
volumeInfo.StartTimestamp = pvb.Status.StartTimestamp
}
if pvb.Status.CompletionTimestamp != nil {
volumeInfo.CompletionTimestamp = pvb.Status.CompletionTimestamp
}
pod := new(corev1api.Pod)
pvcName := ""
err := v.crClient.Get(context.TODO(), kbclient.ObjectKey{Namespace: pvb.Spec.Pod.Namespace, Name: pvb.Spec.Pod.Name}, pod)
@ -522,7 +538,6 @@ func (v *VolumesInformation) generateVolumeInfoFromDataUpload() {
PVName: pvcPVInfo.PV.Name,
SnapshotDataMoved: true,
Skipped: false,
StartTimestamp: operation.Status.Created,
CSISnapshotInfo: &CSISnapshotInfo{
SnapshotHandle: FieldValueIsUnknown,
VSCName: FieldValueIsUnknown,
@ -540,6 +555,10 @@ func (v *VolumesInformation) generateVolumeInfoFromDataUpload() {
},
}
if dataUpload.Status.StartTimestamp != nil {
volumeInfo.StartTimestamp = dataUpload.Status.StartTimestamp
}
tmpVolumeInfos = append(tmpVolumeInfos, volumeInfo)
} else {
v.logger.Warnf("Cannot find info for PVC %s/%s", operation.Spec.ResourceIdentifier.Namespace, operation.Spec.ResourceIdentifier.Name)

View File

@ -372,6 +372,7 @@ func TestGenerateVolumeInfoForCSIVolumeSnapshot(t *testing.T) {
},
Status: &snapshotv1api.VolumeSnapshotStatus{
BoundVolumeSnapshotContentName: stringPtr("testContent"),
CreationTime: &now,
RestoreSize: &resourceQuantity,
},
},
@ -458,6 +459,7 @@ func TestGenerateVolumeInfoForCSIVolumeSnapshot(t *testing.T) {
}
func TestGenerateVolumeInfoFromPVB(t *testing.T) {
now := metav1.Now()
tests := []struct {
name string
pvb *velerov1api.PodVolumeBackup
@ -542,7 +544,7 @@ func TestGenerateVolumeInfoFromPVB(t *testing.T) {
},
},
},
pvb: builder.ForPodVolumeBackup("velero", "testPVB").PodName("testPod").PodNamespace("velero").Result(),
pvb: builder.ForPodVolumeBackup("velero", "testPVB").PodName("testPod").PodNamespace("velero").StartTimestamp(&now).CompletionTimestamp(&now).Result(),
pod: builder.ForPod("velero", "testPod").Containers(&corev1api.Container{
Name: "test",
VolumeMounts: []corev1api.VolumeMount{
@ -563,10 +565,12 @@ func TestGenerateVolumeInfoFromPVB(t *testing.T) {
).Result(),
expectedVolumeInfos: []*VolumeInfo{
{
PVCName: "testPVC",
PVCNamespace: "velero",
PVName: "testPV",
BackupMethod: PodVolumeBackup,
PVCName: "testPVC",
PVCNamespace: "velero",
PVName: "testPV",
BackupMethod: PodVolumeBackup,
StartTimestamp: &now,
CompletionTimestamp: &now,
PVBInfo: &PodVolumeBackupInfo{
PodName: "testPod",
PodNamespace: "velero",
@ -605,9 +609,12 @@ func TestGenerateVolumeInfoFromPVB(t *testing.T) {
}
func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
// The unstructured conversion will loose the time precision to second
// level. To make test pass. Set the now precision at second at the
// beginning.
now := metav1.Now().Rfc3339Copy()
features.Enable(velerov1api.CSIFeatureFlag)
defer features.Disable(velerov1api.CSIFeatureFlag)
now := metav1.Now()
tests := []struct {
name string
volumeSnapshotClass *snapshotv1api.VolumeSnapshotClass
@ -685,7 +692,7 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
name: "VolumeSnapshotClass cannot be found for operation",
dataUpload: builder.ForDataUpload("velero", "testDU").DataMover("velero").CSISnapshot(&velerov2alpha1.CSISnapshotSpec{
VolumeSnapshot: "testVS",
}).SnapshotID("testSnapshotHandle").Result(),
}).SnapshotID("testSnapshotHandle").StartTimestamp(&now).Result(),
operation: &itemoperation.BackupOperation{
Spec: itemoperation.BackupOperationSpec{
OperationID: "testOperation",
@ -731,6 +738,7 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
PVName: "testPV",
BackupMethod: CSISnapshot,
SnapshotDataMoved: true,
StartTimestamp: &now,
CSISnapshotInfo: &CSISnapshotInfo{
SnapshotHandle: FieldValueIsUnknown,
VSCName: FieldValueIsUnknown,
@ -754,7 +762,7 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
dataUpload: builder.ForDataUpload("velero", "testDU").DataMover("velero").CSISnapshot(&velerov2alpha1.CSISnapshotSpec{
VolumeSnapshot: "testVS",
SnapshotClass: "testClass",
}).SnapshotID("testSnapshotHandle").Result(),
}).SnapshotID("testSnapshotHandle").StartTimestamp(&now).Result(),
volumeSnapshotClass: builder.ForVolumeSnapshotClass("testClass").Driver("pd.csi.storage.gke.io").Result(),
operation: &itemoperation.BackupOperation{
Spec: itemoperation.BackupOperationSpec{
@ -778,9 +786,6 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
},
},
},
Status: itemoperation.OperationStatus{
Created: &now,
},
},
pvMap: map[string]pvcPvInfo{
"testPV": {
@ -853,7 +858,12 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
volumesInfo.logger = logging.DefaultLogger(logrus.DebugLevel, logging.FormatJSON)
volumesInfo.generateVolumeInfoFromDataUpload()
require.Equal(t, tc.expectedVolumeInfos, volumesInfo.volumeInfos)
if len(tc.expectedVolumeInfos) > 0 {
require.Equal(t, tc.expectedVolumeInfos[0].PVInfo, volumesInfo.volumeInfos[0].PVInfo)
require.Equal(t, tc.expectedVolumeInfos[0].SnapshotDataMovementInfo, volumesInfo.volumeInfos[0].SnapshotDataMovementInfo)
require.Equal(t, tc.expectedVolumeInfos[0].CSISnapshotInfo, volumesInfo.volumeInfos[0].CSISnapshotInfo)
}
})
}
}

View File

@ -18,6 +18,7 @@ package backup
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/json"
@ -32,18 +33,21 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"github.com/vmware-tanzu/velero/internal/hook"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/hook"
"github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
@ -66,11 +70,30 @@ const BackupFormatVersion = "1.1.0"
type Backupper interface {
// Backup takes a backup using the specification in the velerov1api.Backup and writes backup and log data
// to the given writers.
Backup(logger logrus.FieldLogger, backup *Request, backupFile io.Writer, actions []biav2.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error
BackupWithResolvers(log logrus.FieldLogger, backupRequest *Request, backupFile io.Writer, backupItemActionResolver framework.BackupItemActionResolverV2, volumeSnapshotterGetter VolumeSnapshotterGetter) error
FinalizeBackup(log logrus.FieldLogger, backupRequest *Request, inBackupFile io.Reader, outBackupFile io.Writer,
Backup(
logger logrus.FieldLogger,
backup *Request,
backupFile io.Writer,
actions []biav2.BackupItemAction,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) error
BackupWithResolvers(
log logrus.FieldLogger,
backupRequest *Request,
backupFile io.Writer,
backupItemActionResolver framework.BackupItemActionResolverV2,
asyncBIAOperations []*itemoperation.BackupOperation) error
volumeSnapshotterGetter VolumeSnapshotterGetter,
) error
FinalizeBackup(
log logrus.FieldLogger,
backupRequest *Request,
inBackupFile io.Reader,
outBackupFile io.Writer,
backupItemActionResolver framework.BackupItemActionResolverV2,
asyncBIAOperations []*itemoperation.BackupOperation,
) error
}
// kubernetesBackupper implements Backupper.
@ -84,6 +107,8 @@ type kubernetesBackupper struct {
defaultVolumesToFsBackup bool
clientPageSize int
uploaderType string
pluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
}
func (i *itemKey) String() string {
@ -111,6 +136,8 @@ func NewKubernetesBackupper(
defaultVolumesToFsBackup bool,
clientPageSize int,
uploaderType string,
pluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
) (Backupper, error) {
return &kubernetesBackupper{
kbClient: kbClient,
@ -122,6 +149,8 @@ func NewKubernetesBackupper(
defaultVolumesToFsBackup: defaultVolumesToFsBackup,
clientPageSize: clientPageSize,
uploaderType: uploaderType,
pluginManager: pluginManager,
backupStoreGetter: backupStoreGetter,
}, nil
}
@ -183,11 +212,13 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
return kb.BackupWithResolvers(log, backupRequest, backupFile, backupItemActions, volumeSnapshotterGetter)
}
func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
func (kb *kubernetesBackupper) BackupWithResolvers(
log logrus.FieldLogger,
backupRequest *Request,
backupFile io.Writer,
backupItemActionResolver framework.BackupItemActionResolverV2,
volumeSnapshotterGetter VolumeSnapshotterGetter) error {
volumeSnapshotterGetter VolumeSnapshotterGetter,
) error {
gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close()
@ -473,7 +504,13 @@ func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.Grou
return backedUpItem
}
func (kb *kubernetesBackupper) finalizeItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) (bool, []FileForArchive) {
func (kb *kubernetesBackupper) finalizeItem(
log logrus.FieldLogger,
gr schema.GroupResource,
itemBackupper *itemBackupper,
unstructured *unstructured.Unstructured,
preferredGVR schema.GroupVersionResource,
) (bool, []FileForArchive) {
backedUpItem, updateFiles, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, true, true)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors()))
@ -551,12 +588,14 @@ func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
return nil
}
func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
func (kb *kubernetesBackupper) FinalizeBackup(
log logrus.FieldLogger,
backupRequest *Request,
inBackupFile io.Reader,
outBackupFile io.Writer,
backupItemActionResolver framework.BackupItemActionResolverV2,
asyncBIAOperations []*itemoperation.BackupOperation) error {
asyncBIAOperations []*itemoperation.BackupOperation,
) error {
gzw := gzip.NewWriter(outBackupFile)
defer gzw.Close()
tw := tar.NewWriter(gzw)
@ -619,6 +658,8 @@ func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
updateFiles := make(map[string]FileForArchive)
backedUpGroupResources := map[schema.GroupResource]bool{}
unstructuredDataUploads := make([]unstructured.Unstructured, 0)
for i, item := range items {
log.WithFields(map[string]interface{}{
"progress": "",
@ -645,6 +686,10 @@ func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
return
}
if item.groupResource == kuberesource.DataUploads {
unstructuredDataUploads = append(unstructuredDataUploads, unstructured)
}
backedUp, itemFiles := kb.finalizeItem(log, item.groupResource, itemBackupper, &unstructured, item.preferredGVR)
if backedUp {
backedUpGroupResources[item.groupResource] = true
@ -666,6 +711,22 @@ func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", len(backupRequest.BackedUpItems), totalItems)
}
backupStore, volumeInfos, err := kb.getVolumeInfos(*backupRequest.Backup, log)
if err != nil {
log.WithError(err).Errorf("fail to get the backup VolumeInfos for backup %s", backupRequest.Name)
return err
}
if err := updateVolumeInfos(volumeInfos, unstructuredDataUploads, asyncBIAOperations, log); err != nil {
log.WithError(err).Errorf("fail to update VolumeInfos for backup %s", backupRequest.Name)
return err
}
if err := putVolumeInfos(backupRequest.Name, volumeInfos, backupStore); err != nil {
log.WithError(err).Errorf("fail to put the VolumeInfos for backup %s", backupRequest.Name)
return err
}
// write new tar archive replacing files in original with content updateFiles for matches
if err := buildFinalTarball(tr, tw, updateFiles); err != nil {
log.Errorf("Error building final tarball: %s", err.Error())
@ -733,3 +794,100 @@ type tarWriter interface {
Write([]byte) (int, error)
WriteHeader(*tar.Header) error
}
func (kb *kubernetesBackupper) getVolumeInfos(
backup velerov1api.Backup,
log logrus.FieldLogger,
) (persistence.BackupStore, []*volume.VolumeInfo, error) {
location := &velerov1api.BackupStorageLocation{}
if err := kb.kbClient.Get(context.Background(), kbclient.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Spec.StorageLocation,
}, location); err != nil {
return nil, nil, errors.WithStack(err)
}
pluginManager := kb.pluginManager(log)
defer pluginManager.CleanupClients()
backupStore, storeErr := kb.backupStoreGetter.Get(location, pluginManager, log)
if storeErr != nil {
return nil, nil, storeErr
}
volumeInfos, err := backupStore.GetBackupVolumeInfos(backup.Name)
if err != nil {
return nil, nil, err
}
return backupStore, volumeInfos, nil
}
// updateVolumeInfos update the VolumeInfos according to the AsyncOperations
func updateVolumeInfos(
volumeInfos []*volume.VolumeInfo,
unstructuredItems []unstructured.Unstructured,
operations []*itemoperation.BackupOperation,
log logrus.FieldLogger,
) error {
for _, unstructured := range unstructuredItems {
var dataUpload velerov2alpha1.DataUpload
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), &dataUpload)
if err != nil {
log.WithError(err).Errorf("fail to convert DataUpload: %s/%s",
unstructured.GetNamespace(), unstructured.GetName())
return err
}
for index := range volumeInfos {
if volumeInfos[index].PVCName == dataUpload.Spec.SourcePVC &&
volumeInfos[index].PVCNamespace == dataUpload.Spec.SourceNamespace {
if dataUpload.Status.CompletionTimestamp != nil {
volumeInfos[index].CompletionTimestamp = dataUpload.Status.CompletionTimestamp
}
volumeInfos[index].SnapshotDataMovementInfo.SnapshotHandle = dataUpload.Status.SnapshotID
volumeInfos[index].SnapshotDataMovementInfo.RetainedSnapshot = dataUpload.Spec.CSISnapshot.VolumeSnapshot
volumeInfos[index].SnapshotDataMovementInfo.Size = dataUpload.Status.Progress.TotalBytes
}
}
}
// Update CSI snapshot VolumeInfo's CompletionTimestamp by the operation update time.
for volumeIndex := range volumeInfos {
if volumeInfos[volumeIndex].BackupMethod == volume.CSISnapshot &&
volumeInfos[volumeIndex].CSISnapshotInfo != nil {
for opIndex := range operations {
if volumeInfos[volumeIndex].CSISnapshotInfo.OperationID == operations[opIndex].Spec.OperationID {
// The VolumeSnapshot and VolumeSnapshotContent don't have a completion timestamp,
// so use the operation.Status.Updated as the alternative. It is not the exact time
// when the snapshot turns ready, but the operation controller periodically watch the
// VSC and VS status. When the controller finds they reach to the ReadyToUse state,
// The operation.Status.Updated is set as the found time.
volumeInfos[volumeIndex].CompletionTimestamp = operations[opIndex].Status.Updated
}
}
}
}
return nil
}
func putVolumeInfos(
backupName string,
volumeInfos []*volume.VolumeInfo,
backupStore persistence.BackupStore,
) error {
backupVolumeInfoBuf := new(bytes.Buffer)
gzw := gzip.NewWriter(backupVolumeInfoBuf)
defer gzw.Close()
if err := json.NewEncoder(gzw).Encode(volumeInfos); err != nil {
return errors.Wrap(err, "error encoding restore results to JSON")
}
if err := gzw.Close(); err != nil {
return errors.Wrap(err, "error closing gzip writer")
}
return backupStore.PutBackupVolumeInfos(backupName, backupVolumeInfoBuf)
}

View File

@ -43,13 +43,19 @@ import (
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
"github.com/vmware-tanzu/velero/internal/volume"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/features"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/persistence"
persistencemocks "github.com/vmware-tanzu/velero/pkg/persistence/mocks"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
pluginmocks "github.com/vmware-tanzu/velero/pkg/plugin/mocks"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
vsv1 "github.com/vmware-tanzu/velero/pkg/plugin/velero/volumesnapshotter/v1"
@ -4441,3 +4447,144 @@ func TestBackupNamespaces(t *testing.T) {
})
}
}
func TestGetVolumeInfos(t *testing.T) {
h := newHarness(t)
pluginManager := new(pluginmocks.Manager)
backupStore := new(persistencemocks.BackupStore)
h.backupper.pluginManager = func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }
h.backupper.backupStoreGetter = NewFakeSingleObjectBackupStoreGetter(backupStore)
backupStore.On("GetBackupVolumeInfos", "backup-01").Return([]*volume.VolumeInfo{}, nil)
pluginManager.On("CleanupClients").Return()
backup := builder.ForBackup("velero", "backup-01").StorageLocation("default").Result()
bsl := builder.ForBackupStorageLocation("velero", "default").Result()
require.NoError(t, h.backupper.kbClient.Create(context.Background(), bsl))
_, _, err := h.backupper.getVolumeInfos(*backup, h.log)
require.NoError(t, err)
}
func TestUpdateVolumeInfos(t *testing.T) {
timeExample := time.Date(2014, 6, 5, 11, 56, 45, 0, time.Local)
now := metav1.NewTime(timeExample)
logger := logrus.StandardLogger()
tests := []struct {
name string
operations []*itemoperation.BackupOperation
dataUpload *velerov2alpha1.DataUpload
volumeInfos []*volume.VolumeInfo
expectedVolumeInfos []*volume.VolumeInfo
}{
{
name: "CSISnapshot VolumeInfo update",
operations: []*itemoperation.BackupOperation{
{
Spec: itemoperation.BackupOperationSpec{
OperationID: "test-operation",
},
Status: itemoperation.OperationStatus{
Updated: &now,
},
},
},
volumeInfos: []*volume.VolumeInfo{
{
BackupMethod: volume.CSISnapshot,
CompletionTimestamp: &metav1.Time{},
CSISnapshotInfo: &volume.CSISnapshotInfo{
OperationID: "test-operation",
},
},
},
expectedVolumeInfos: []*volume.VolumeInfo{
{
BackupMethod: volume.CSISnapshot,
CompletionTimestamp: &now,
CSISnapshotInfo: &volume.CSISnapshotInfo{
OperationID: "test-operation",
},
},
},
},
{
name: "DataUpload VolumeInfo update",
operations: []*itemoperation.BackupOperation{},
dataUpload: builder.ForDataUpload("velero", "du-1").
CompletionTimestamp(&now).
CSISnapshot(&velerov2alpha1.CSISnapshotSpec{VolumeSnapshot: "vs-1"}).
SnapshotID("snapshot-id").
Progress(shared.DataMoveOperationProgress{TotalBytes: 1000}).
SourceNamespace("ns-1").
SourcePVC("pvc-1").
Result(),
volumeInfos: []*volume.VolumeInfo{
{
PVCName: "pvc-1",
PVCNamespace: "ns-1",
CompletionTimestamp: &metav1.Time{},
SnapshotDataMovementInfo: &volume.SnapshotDataMovementInfo{
DataMover: "velero",
},
},
},
expectedVolumeInfos: []*volume.VolumeInfo{
{
PVCName: "pvc-1",
PVCNamespace: "ns-1",
CompletionTimestamp: &now,
SnapshotDataMovementInfo: &volume.SnapshotDataMovementInfo{
DataMover: "velero",
RetainedSnapshot: "vs-1",
SnapshotHandle: "snapshot-id",
Size: 1000,
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
unstructures := []unstructured.Unstructured{}
if tc.dataUpload != nil {
duMap, error := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.dataUpload)
require.NoError(t, error)
unstructures = append(unstructures,
unstructured.Unstructured{
Object: duMap,
},
)
}
require.NoError(t, updateVolumeInfos(tc.volumeInfos, unstructures, tc.operations, logger))
require.Equal(t, tc.expectedVolumeInfos[0].CompletionTimestamp, tc.volumeInfos[0].CompletionTimestamp)
require.Equal(t, tc.expectedVolumeInfos[0].SnapshotDataMovementInfo, tc.volumeInfos[0].SnapshotDataMovementInfo)
})
}
}
func TestPutVolumeInfos(t *testing.T) {
backupName := "backup-01"
backupStore := new(persistencemocks.BackupStore)
backupStore.On("PutBackupVolumeInfos", mock.Anything, mock.Anything).Return(nil)
require.NoError(t, putVolumeInfos(backupName, []*volume.VolumeInfo{}, backupStore))
}
type fakeSingleObjectBackupStoreGetter struct {
store persistence.BackupStore
}
func (f *fakeSingleObjectBackupStoreGetter) Get(*velerov1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return f.store, nil
}
// NewFakeSingleObjectBackupStoreGetter returns an ObjectBackupStoreGetter
// that will return only the given BackupStore.
func NewFakeSingleObjectBackupStoreGetter(store persistence.BackupStore) persistence.ObjectBackupStoreGetter {
return &fakeSingleObjectBackupStoreGetter{store: store}
}

View File

@ -19,6 +19,7 @@ package builder
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
)
@ -115,8 +116,14 @@ func (d *DataUploadBuilder) CSISnapshot(cSISnapshot *velerov2alpha1api.CSISnapsh
}
// StartTimestamp sets the DataUpload's StartTimestamp.
func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBuilder {
d.object.Status.StartTimestamp = startTime
func (d *DataUploadBuilder) StartTimestamp(startTimestamp *metav1.Time) *DataUploadBuilder {
d.object.Status.StartTimestamp = startTimestamp
return d
}
// CompletionTimestamp sets the DataUpload's StartTimestamp.
func (d *DataUploadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataUploadBuilder {
d.object.Status.CompletionTimestamp = completionTimestamp
return d
}
@ -125,3 +132,8 @@ func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder
d.object.Labels = labels
return d
}
func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataUploadBuilder {
d.object.Status.Progress = progress
return d
}

View File

@ -80,6 +80,16 @@ func (b *PodVolumeBackupBuilder) SnapshotID(snapshotID string) *PodVolumeBackupB
return b
}
func (b *PodVolumeBackupBuilder) StartTimestamp(startTimestamp *metav1.Time) *PodVolumeBackupBuilder {
b.object.Status.StartTimestamp = startTimestamp
return b
}
func (b *PodVolumeBackupBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *PodVolumeBackupBuilder {
b.object.Status.CompletionTimestamp = completionTimestamp
return b
}
// PodName sets the name of the pod associated with this PodVolumeBackup.
func (b *PodVolumeBackupBuilder) PodName(name string) *PodVolumeBackupBuilder {
b.object.Spec.Pod.Name = name

View File

@ -780,6 +780,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.config.defaultVolumesToFsBackup,
s.config.clientPageSize,
s.config.uploaderType,
newPluginManager,
backupStoreGetter,
)
cmd.CheckError(err)
if err := controller.NewBackupReconciler(
@ -860,6 +862,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.config.defaultVolumesToFsBackup,
s.config.clientPageSize,
s.config.uploaderType,
newPluginManager,
backupStoreGetter,
)
cmd.CheckError(err)
r := controller.NewBackupFinalizerReconciler(

View File

@ -78,9 +78,14 @@ func (b *fakeBackupper) BackupWithResolvers(logger logrus.FieldLogger, backup *p
return args.Error(0)
}
func (b *fakeBackupper) FinalizeBackup(logger logrus.FieldLogger, backup *pkgbackup.Request, inBackupFile io.Reader, outBackupFile io.Writer,
func (b *fakeBackupper) FinalizeBackup(
logger logrus.FieldLogger,
backup *pkgbackup.Request,
inBackupFile io.Reader,
outBackupFile io.Writer,
backupItemActionResolver framework.BackupItemActionResolverV2,
asyncBIAOperations []*itemoperation.BackupOperation) error {
asyncBIAOperations []*itemoperation.BackupOperation,
) error {
args := b.Called(logger, backup, inBackupFile, outBackupFile, backupItemActionResolver, asyncBIAOperations)
return args.Error(0)
}

View File

@ -111,7 +111,11 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
original := backup.DeepCopy()
defer func() {
switch backup.Status.Phase {
case velerov1api.BackupPhaseCompleted, velerov1api.BackupPhasePartiallyFailed, velerov1api.BackupPhaseFailed, velerov1api.BackupPhaseFailedValidation:
case
velerov1api.BackupPhaseCompleted,
velerov1api.BackupPhasePartiallyFailed,
velerov1api.BackupPhaseFailed,
velerov1api.BackupPhaseFailedValidation:
r.backupTracker.Delete(backup.Namespace, backup.Name)
}
// Always attempt to Patch the backup object and status after each reconciliation.
@ -151,7 +155,6 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
var outBackupFile *os.File
if len(operations) > 0 {
// Call itemBackupper.BackupItem for the list of items updated by async operations
log.Info("Setting up finalized backup temp file")
inBackupFile, err := downloadToTempFile(backup.Name, backupStore, log)
if err != nil {
@ -172,7 +175,16 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, errors.WithStack(err)
}
backupItemActionsResolver := framework.NewBackupItemActionResolverV2(actions)
err = r.backupper.FinalizeBackup(log, backupRequest, inBackupFile, outBackupFile, backupItemActionsResolver, operations)
// Call itemBackupper.BackupItem for the list of items updated by async operations
err = r.backupper.FinalizeBackup(
log,
backupRequest,
inBackupFile,
outBackupFile,
backupItemActionsResolver,
operations,
)
if err != nil {
log.WithError(err).Error("error finalizing Backup")
return ctrl.Result{}, errors.WithStack(err)

View File

@ -222,6 +222,8 @@ func TestBackupFinalizerReconcile(t *testing.T) {
backupStore.On("GetBackupContents", mock.Anything).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil)
backupStore.On("PutBackupContents", mock.Anything, mock.Anything).Return(nil)
backupStore.On("PutBackupMetadata", mock.Anything, mock.Anything).Return(nil)
backupStore.On("GetBackupVolumeInfos", mock.Anything).Return(nil, nil)
backupStore.On("PutBackupVolumeInfos", mock.Anything, mock.Anything).Return(nil)
pluginManager.On("GetBackupItemActionsV2").Return(nil, nil)
backupper.On("FinalizeBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, framework.BackupItemActionResolverV2{}, mock.Anything).Return(nil)
_, err := reconciler.Reconcile(context.TODO(), ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}})

View File

@ -35,4 +35,5 @@ var (
VolumeSnapshots = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshots"}
VolumeSnapshotContents = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshotcontents"}
PriorityClasses = schema.GroupResource{Group: "scheduling.k8s.io", Resource: "priorityclasses"}
DataUploads = schema.GroupResource{Group: "velero.io", Resource: "datauploads"}
)

View File

@ -314,7 +314,7 @@ func (_m *BackupStore) GetRestoreItemOperations(name string) ([]*itemoperation.R
return r0, r1
}
// GetRestoreItemOperations provides a mock function with given fields: name
// GetBackupVolumeInfos provides a mock function with given fields: name
func (_m *BackupStore) GetBackupVolumeInfos(name string) ([]*volume.VolumeInfo, error) {
ret := _m.Called(name)
@ -337,6 +337,20 @@ func (_m *BackupStore) GetBackupVolumeInfos(name string) ([]*volume.VolumeInfo,
return r0, r1
}
// PutBackupVolumeInfos provides a mock function with given fields: name, volumeInfo
func (_m *BackupStore) PutBackupVolumeInfos(name string, volumeInfo io.Reader) error {
ret := _m.Called(name, volumeInfo)
var r0 error
if rf, ok := ret.Get(0).(func(string, io.Reader) error); ok {
r0 = rf(name, volumeInfo)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetRestoreResults provides a mock function with given fields: name
func (_m *BackupStore) GetRestoreResults(name string) (map[string]results.Result, error) {
ret := _m.Called(name)

View File

@ -75,6 +75,7 @@ type BackupStore interface {
GetCSIVolumeSnapshotContents(name string) ([]*snapshotv1api.VolumeSnapshotContent, error)
GetCSIVolumeSnapshotClasses(name string) ([]*snapshotv1api.VolumeSnapshotClass, error)
GetBackupVolumeInfos(name string) ([]*volume.VolumeInfo, error)
PutBackupVolumeInfos(name string, volumeInfo io.Reader) error
GetRestoreResults(name string) (map[string]results.Result, error)
// BackupExists checks if the backup metadata file exists in object storage.
@ -516,6 +517,10 @@ func (s *objectBackupStore) GetBackupVolumeInfos(name string) ([]*volume.VolumeI
return volumeInfos, nil
}
func (s *objectBackupStore) PutBackupVolumeInfos(name string, volumeInfo io.Reader) error {
return s.objectStore.PutObject(s.bucket, s.layout.getBackupVolumeInfoKey(name), volumeInfo)
}
func (s *objectBackupStore) GetRestoreResults(name string) (map[string]results.Result, error) {
results := make(map[string]results.Result)

View File

@ -1203,6 +1203,51 @@ func TestGetRestoredResourceList(t *testing.T) {
assert.EqualValues(t, list["pod"], res["pod"])
}
func TestPutBackupVolumeInfos(t *testing.T) {
tests := []struct {
name string
prefix string
expectedErr string
expectedKeys []string
}{
{
name: "normal case",
expectedErr: "",
expectedKeys: []string{
"backups/backup-1/backup-1-volumeinfo.json.gz",
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
harness := newObjectBackupStoreTestHarness("foo", tc.prefix)
volumeInfos := []*volume.VolumeInfo{
{
PVCName: "test",
},
}
buf := new(bytes.Buffer)
gzw := gzip.NewWriter(buf)
defer gzw.Close()
require.NoError(t, json.NewEncoder(gzw).Encode(volumeInfos))
bufferContent := buf.Bytes()
err := harness.PutBackupVolumeInfos("backup-1", buf)
velerotest.AssertErrorMatches(t, tc.expectedErr, err)
assert.Len(t, harness.objectStore.Data[harness.bucket], len(tc.expectedKeys))
for _, key := range tc.expectedKeys {
assert.Contains(t, harness.objectStore.Data[harness.bucket], key)
assert.Equal(t, harness.objectStore.Data[harness.bucket][key], bufferContent)
}
})
}
}
func encodeToBytes(obj runtime.Object) []byte {
res, err := encode.Encode(obj, "json")
if err != nil {

View File

@ -1268,7 +1268,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// want to dynamically re-provision it.
return warnings, errs, itemExists
} else {
obj, err = ctx.handleSkippedPVHasRetainPolicy(obj, resourceID, restoreLogger)
obj, err = ctx.handleSkippedPVHasRetainPolicy(obj, restoreLogger)
if err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists
@ -1322,7 +1322,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs, itemExists
default:
obj, err = ctx.handleSkippedPVHasRetainPolicy(obj, resourceID, restoreLogger)
obj, err = ctx.handleSkippedPVHasRetainPolicy(obj, restoreLogger)
if err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists
@ -2497,7 +2497,6 @@ func (ctx *restoreContext) handlePVHasNativeSnapshot(obj *unstructured.Unstructu
func (ctx *restoreContext) handleSkippedPVHasRetainPolicy(
obj *unstructured.Unstructured,
resourceID string,
logger logrus.FieldLogger,
) (*unstructured.Unstructured, error) {
logger.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.")
@ -2508,13 +2507,5 @@ func (ctx *restoreContext) handleSkippedPVHasRetainPolicy(
}
obj = resetVolumeBindingInfo(obj)
// We call the pvRestorer here to clear out the PV's claimRef.UID,
// so it can be re-claimed when its PVC is restored and gets a new UID.
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
if err != nil {
return nil, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err)
}
return updatedObj, nil
return obj, nil
}