Support update the backup VolumeInfos by the Async ops result.
1. Add PutBackupVolumeInfos method. 2. Add CompletionTimestamp in VolumeInfo. 3. Add Size in SnapshotDataMovementInfo. 4. Update CompletionTimpstmap, SnapshotHandle, RetainedSnapshot and Size in VolumeInfo on DataUpload Operation completes. Signed-off-by: Xun Jiang <blackpigletbruce@gmail.com>pull/7554/head
parent
d640cc16ab
commit
b06d7a467f
|
@ -0,0 +1 @@
|
|||
Support update the backup VolumeInfos by the Async ops result.
|
|
@ -76,6 +76,9 @@ type VolumeInfo struct {
|
|||
// Snapshot starts timestamp.
|
||||
StartTimestamp *metav1.Time `json:"startTimestamp,omitempty"`
|
||||
|
||||
// Snapshot completes timestamp.
|
||||
CompletionTimestamp *metav1.Time `json:"completionTimestamp,omitempty"`
|
||||
|
||||
CSISnapshotInfo *CSISnapshotInfo `json:"csiSnapshotInfo,omitempty"`
|
||||
SnapshotDataMovementInfo *SnapshotDataMovementInfo `json:"snapshotDataMovementInfo,omitempty"`
|
||||
NativeSnapshotInfo *NativeSnapshotInfo `json:"nativeSnapshotInfo,omitempty"`
|
||||
|
@ -119,6 +122,9 @@ type SnapshotDataMovementInfo struct {
|
|||
|
||||
// The Async Operation's ID.
|
||||
OperationID string `json:"operationID"`
|
||||
|
||||
// Moved snapshot data size.
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
// NativeSnapshotInfo is used for displaying the Velero native snapshot status.
|
||||
|
@ -379,7 +385,6 @@ func (v *VolumesInformation) generateVolumeInfoForCSIVolumeSnapshot() {
|
|||
Skipped: false,
|
||||
SnapshotDataMoved: false,
|
||||
PreserveLocalSnapshot: true,
|
||||
StartTimestamp: &(volumeSnapshot.CreationTimestamp),
|
||||
CSISnapshotInfo: &CSISnapshotInfo{
|
||||
VSCName: *volumeSnapshot.Status.BoundVolumeSnapshotContentName,
|
||||
Size: size,
|
||||
|
@ -393,6 +398,10 @@ func (v *VolumesInformation) generateVolumeInfoForCSIVolumeSnapshot() {
|
|||
},
|
||||
}
|
||||
|
||||
if volumeSnapshot.Status.CreationTime != nil {
|
||||
volumeInfo.StartTimestamp = volumeSnapshot.Status.CreationTime
|
||||
}
|
||||
|
||||
tmpVolumeInfos = append(tmpVolumeInfos, volumeInfo)
|
||||
} else {
|
||||
v.logger.Warnf("cannot find info for PVC %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Spec.Source.PersistentVolumeClaimName)
|
||||
|
@ -412,7 +421,6 @@ func (v *VolumesInformation) generateVolumeInfoFromPVB() {
|
|||
BackupMethod: PodVolumeBackup,
|
||||
SnapshotDataMoved: false,
|
||||
Skipped: false,
|
||||
StartTimestamp: pvb.Status.StartTimestamp,
|
||||
PVBInfo: &PodVolumeBackupInfo{
|
||||
SnapshotHandle: pvb.Status.SnapshotID,
|
||||
Size: pvb.Status.Progress.TotalBytes,
|
||||
|
@ -424,6 +432,14 @@ func (v *VolumesInformation) generateVolumeInfoFromPVB() {
|
|||
},
|
||||
}
|
||||
|
||||
if pvb.Status.StartTimestamp != nil {
|
||||
volumeInfo.StartTimestamp = pvb.Status.StartTimestamp
|
||||
}
|
||||
|
||||
if pvb.Status.CompletionTimestamp != nil {
|
||||
volumeInfo.CompletionTimestamp = pvb.Status.CompletionTimestamp
|
||||
}
|
||||
|
||||
pod := new(corev1api.Pod)
|
||||
pvcName := ""
|
||||
err := v.crClient.Get(context.TODO(), kbclient.ObjectKey{Namespace: pvb.Spec.Pod.Namespace, Name: pvb.Spec.Pod.Name}, pod)
|
||||
|
@ -522,7 +538,6 @@ func (v *VolumesInformation) generateVolumeInfoFromDataUpload() {
|
|||
PVName: pvcPVInfo.PV.Name,
|
||||
SnapshotDataMoved: true,
|
||||
Skipped: false,
|
||||
StartTimestamp: operation.Status.Created,
|
||||
CSISnapshotInfo: &CSISnapshotInfo{
|
||||
SnapshotHandle: FieldValueIsUnknown,
|
||||
VSCName: FieldValueIsUnknown,
|
||||
|
@ -540,6 +555,10 @@ func (v *VolumesInformation) generateVolumeInfoFromDataUpload() {
|
|||
},
|
||||
}
|
||||
|
||||
if dataUpload.Status.StartTimestamp != nil {
|
||||
volumeInfo.StartTimestamp = dataUpload.Status.StartTimestamp
|
||||
}
|
||||
|
||||
tmpVolumeInfos = append(tmpVolumeInfos, volumeInfo)
|
||||
} else {
|
||||
v.logger.Warnf("Cannot find info for PVC %s/%s", operation.Spec.ResourceIdentifier.Namespace, operation.Spec.ResourceIdentifier.Name)
|
||||
|
|
|
@ -372,6 +372,7 @@ func TestGenerateVolumeInfoForCSIVolumeSnapshot(t *testing.T) {
|
|||
},
|
||||
Status: &snapshotv1api.VolumeSnapshotStatus{
|
||||
BoundVolumeSnapshotContentName: stringPtr("testContent"),
|
||||
CreationTime: &now,
|
||||
RestoreSize: &resourceQuantity,
|
||||
},
|
||||
},
|
||||
|
@ -458,6 +459,7 @@ func TestGenerateVolumeInfoForCSIVolumeSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGenerateVolumeInfoFromPVB(t *testing.T) {
|
||||
now := metav1.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
pvb *velerov1api.PodVolumeBackup
|
||||
|
@ -542,7 +544,7 @@ func TestGenerateVolumeInfoFromPVB(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
pvb: builder.ForPodVolumeBackup("velero", "testPVB").PodName("testPod").PodNamespace("velero").Result(),
|
||||
pvb: builder.ForPodVolumeBackup("velero", "testPVB").PodName("testPod").PodNamespace("velero").StartTimestamp(&now).CompletionTimestamp(&now).Result(),
|
||||
pod: builder.ForPod("velero", "testPod").Containers(&corev1api.Container{
|
||||
Name: "test",
|
||||
VolumeMounts: []corev1api.VolumeMount{
|
||||
|
@ -563,10 +565,12 @@ func TestGenerateVolumeInfoFromPVB(t *testing.T) {
|
|||
).Result(),
|
||||
expectedVolumeInfos: []*VolumeInfo{
|
||||
{
|
||||
PVCName: "testPVC",
|
||||
PVCNamespace: "velero",
|
||||
PVName: "testPV",
|
||||
BackupMethod: PodVolumeBackup,
|
||||
PVCName: "testPVC",
|
||||
PVCNamespace: "velero",
|
||||
PVName: "testPV",
|
||||
BackupMethod: PodVolumeBackup,
|
||||
StartTimestamp: &now,
|
||||
CompletionTimestamp: &now,
|
||||
PVBInfo: &PodVolumeBackupInfo{
|
||||
PodName: "testPod",
|
||||
PodNamespace: "velero",
|
||||
|
@ -605,9 +609,12 @@ func TestGenerateVolumeInfoFromPVB(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
|
||||
// The unstructured conversion will loose the time precision to second
|
||||
// level. To make test pass. Set the now precision at second at the
|
||||
// beginning.
|
||||
now := metav1.Now().Rfc3339Copy()
|
||||
features.Enable(velerov1api.CSIFeatureFlag)
|
||||
defer features.Disable(velerov1api.CSIFeatureFlag)
|
||||
now := metav1.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
volumeSnapshotClass *snapshotv1api.VolumeSnapshotClass
|
||||
|
@ -685,7 +692,7 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
|
|||
name: "VolumeSnapshotClass cannot be found for operation",
|
||||
dataUpload: builder.ForDataUpload("velero", "testDU").DataMover("velero").CSISnapshot(&velerov2alpha1.CSISnapshotSpec{
|
||||
VolumeSnapshot: "testVS",
|
||||
}).SnapshotID("testSnapshotHandle").Result(),
|
||||
}).SnapshotID("testSnapshotHandle").StartTimestamp(&now).Result(),
|
||||
operation: &itemoperation.BackupOperation{
|
||||
Spec: itemoperation.BackupOperationSpec{
|
||||
OperationID: "testOperation",
|
||||
|
@ -731,6 +738,7 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
|
|||
PVName: "testPV",
|
||||
BackupMethod: CSISnapshot,
|
||||
SnapshotDataMoved: true,
|
||||
StartTimestamp: &now,
|
||||
CSISnapshotInfo: &CSISnapshotInfo{
|
||||
SnapshotHandle: FieldValueIsUnknown,
|
||||
VSCName: FieldValueIsUnknown,
|
||||
|
@ -754,7 +762,7 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
|
|||
dataUpload: builder.ForDataUpload("velero", "testDU").DataMover("velero").CSISnapshot(&velerov2alpha1.CSISnapshotSpec{
|
||||
VolumeSnapshot: "testVS",
|
||||
SnapshotClass: "testClass",
|
||||
}).SnapshotID("testSnapshotHandle").Result(),
|
||||
}).SnapshotID("testSnapshotHandle").StartTimestamp(&now).Result(),
|
||||
volumeSnapshotClass: builder.ForVolumeSnapshotClass("testClass").Driver("pd.csi.storage.gke.io").Result(),
|
||||
operation: &itemoperation.BackupOperation{
|
||||
Spec: itemoperation.BackupOperationSpec{
|
||||
|
@ -778,9 +786,6 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Created: &now,
|
||||
},
|
||||
},
|
||||
pvMap: map[string]pvcPvInfo{
|
||||
"testPV": {
|
||||
|
@ -853,7 +858,12 @@ func TestGenerateVolumeInfoFromDataUpload(t *testing.T) {
|
|||
volumesInfo.logger = logging.DefaultLogger(logrus.DebugLevel, logging.FormatJSON)
|
||||
|
||||
volumesInfo.generateVolumeInfoFromDataUpload()
|
||||
require.Equal(t, tc.expectedVolumeInfos, volumesInfo.volumeInfos)
|
||||
|
||||
if len(tc.expectedVolumeInfos) > 0 {
|
||||
require.Equal(t, tc.expectedVolumeInfos[0].PVInfo, volumesInfo.volumeInfos[0].PVInfo)
|
||||
require.Equal(t, tc.expectedVolumeInfos[0].SnapshotDataMovementInfo, volumesInfo.volumeInfos[0].SnapshotDataMovementInfo)
|
||||
require.Equal(t, tc.expectedVolumeInfos[0].CSISnapshotInfo, volumesInfo.volumeInfos[0].CSISnapshotInfo)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,15 +32,16 @@ import (
|
|||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
"github.com/vmware-tanzu/velero/pkg/discovery"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/kuberesource"
|
||||
|
@ -66,11 +67,31 @@ const BackupFormatVersion = "1.1.0"
|
|||
type Backupper interface {
|
||||
// Backup takes a backup using the specification in the velerov1api.Backup and writes backup and log data
|
||||
// to the given writers.
|
||||
Backup(logger logrus.FieldLogger, backup *Request, backupFile io.Writer, actions []biav2.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error
|
||||
BackupWithResolvers(log logrus.FieldLogger, backupRequest *Request, backupFile io.Writer, backupItemActionResolver framework.BackupItemActionResolverV2, volumeSnapshotterGetter VolumeSnapshotterGetter) error
|
||||
FinalizeBackup(log logrus.FieldLogger, backupRequest *Request, inBackupFile io.Reader, outBackupFile io.Writer,
|
||||
Backup(
|
||||
logger logrus.FieldLogger,
|
||||
backup *Request,
|
||||
backupFile io.Writer,
|
||||
actions []biav2.BackupItemAction,
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter,
|
||||
) error
|
||||
|
||||
BackupWithResolvers(
|
||||
log logrus.FieldLogger,
|
||||
backupRequest *Request,
|
||||
backupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
asyncBIAOperations []*itemoperation.BackupOperation) error
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter,
|
||||
) error
|
||||
|
||||
FinalizeBackup(
|
||||
log logrus.FieldLogger,
|
||||
backupRequest *Request,
|
||||
inBackupFile io.Reader,
|
||||
outBackupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
asyncBIAOperations []*itemoperation.BackupOperation,
|
||||
volumeInfos []*volume.VolumeInfo,
|
||||
) error
|
||||
}
|
||||
|
||||
// kubernetesBackupper implements Backupper.
|
||||
|
@ -183,11 +204,13 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
|
|||
return kb.BackupWithResolvers(log, backupRequest, backupFile, backupItemActions, volumeSnapshotterGetter)
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
|
||||
func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||
log logrus.FieldLogger,
|
||||
backupRequest *Request,
|
||||
backupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter) error {
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter,
|
||||
) error {
|
||||
gzippedData := gzip.NewWriter(backupFile)
|
||||
defer gzippedData.Close()
|
||||
|
||||
|
@ -470,7 +493,13 @@ func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.Grou
|
|||
return backedUpItem
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) finalizeItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) (bool, []FileForArchive) {
|
||||
func (kb *kubernetesBackupper) finalizeItem(
|
||||
log logrus.FieldLogger,
|
||||
gr schema.GroupResource,
|
||||
itemBackupper *itemBackupper,
|
||||
unstructured *unstructured.Unstructured,
|
||||
preferredGVR schema.GroupVersionResource,
|
||||
) (bool, []FileForArchive) {
|
||||
backedUpItem, updateFiles, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, true, true)
|
||||
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
|
||||
log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors()))
|
||||
|
@ -548,12 +577,15 @@ func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
|
||||
func (kb *kubernetesBackupper) FinalizeBackup(
|
||||
log logrus.FieldLogger,
|
||||
backupRequest *Request,
|
||||
inBackupFile io.Reader,
|
||||
outBackupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
asyncBIAOperations []*itemoperation.BackupOperation) error {
|
||||
asyncBIAOperations []*itemoperation.BackupOperation,
|
||||
volumeInfos []*volume.VolumeInfo,
|
||||
) error {
|
||||
gzw := gzip.NewWriter(outBackupFile)
|
||||
defer gzw.Close()
|
||||
tw := tar.NewWriter(gzw)
|
||||
|
@ -642,6 +674,8 @@ func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
|
|||
return
|
||||
}
|
||||
|
||||
updateVolumeInfos(volumeInfos, unstructured, item.groupResource, log)
|
||||
|
||||
backedUp, itemFiles := kb.finalizeItem(log, item.groupResource, itemBackupper, &unstructured, item.preferredGVR)
|
||||
if backedUp {
|
||||
backedUpGroupResources[item.groupResource] = true
|
||||
|
@ -730,3 +764,32 @@ type tarWriter interface {
|
|||
Write([]byte) (int, error)
|
||||
WriteHeader(*tar.Header) error
|
||||
}
|
||||
|
||||
func updateVolumeInfos(
|
||||
volumeInfos []*volume.VolumeInfo,
|
||||
unstructured unstructured.Unstructured,
|
||||
groupResource schema.GroupResource,
|
||||
log logrus.FieldLogger,
|
||||
) {
|
||||
switch groupResource.String() {
|
||||
case kuberesource.DataUploads.String():
|
||||
var dataUpload velerov2alpha1.DataUpload
|
||||
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), &dataUpload)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("fail to convert DataUpload: %s/%s",
|
||||
unstructured.GetNamespace(), unstructured.GetName())
|
||||
}
|
||||
|
||||
for index := range volumeInfos {
|
||||
if volumeInfos[index].PVCName == dataUpload.Spec.SourcePVC &&
|
||||
volumeInfos[index].PVCNamespace == dataUpload.Spec.SourceNamespace {
|
||||
if dataUpload.Status.CompletionTimestamp != nil {
|
||||
volumeInfos[index].CompletionTimestamp = dataUpload.Status.CompletionTimestamp
|
||||
}
|
||||
volumeInfos[index].SnapshotDataMovementInfo.SnapshotHandle = dataUpload.Status.SnapshotID
|
||||
volumeInfos[index].SnapshotDataMovementInfo.RetainedSnapshot = dataUpload.Spec.CSISnapshot.VolumeSnapshot
|
||||
volumeInfos[index].SnapshotDataMovementInfo.Size = dataUpload.Status.Progress.TotalBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,9 @@ import (
|
|||
|
||||
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
|
||||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
"github.com/vmware-tanzu/velero/pkg/discovery"
|
||||
|
@ -4433,3 +4435,59 @@ func TestBackupNamespaces(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateVolumeInfos(t *testing.T) {
|
||||
logger := logrus.StandardLogger()
|
||||
// The unstructured conversion will loose the time precision to second
|
||||
// level. To make test pass. Set the now precision at second at the
|
||||
// beginning.
|
||||
now := metav1.Now().Rfc3339Copy()
|
||||
volumeInfos := []*volume.VolumeInfo{
|
||||
{
|
||||
PVCName: "pvc1",
|
||||
PVCNamespace: "ns1",
|
||||
SnapshotDataMovementInfo: &volume.SnapshotDataMovementInfo{},
|
||||
},
|
||||
}
|
||||
dataUpload := velerov2alpha1.DataUpload{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "du1",
|
||||
Namespace: "velero",
|
||||
},
|
||||
Spec: velerov2alpha1.DataUploadSpec{
|
||||
SourcePVC: "pvc1",
|
||||
SourceNamespace: "ns1",
|
||||
CSISnapshot: &velerov2alpha1.CSISnapshotSpec{
|
||||
VolumeSnapshot: "vs1",
|
||||
},
|
||||
},
|
||||
Status: velerov2alpha1.DataUploadStatus{
|
||||
CompletionTimestamp: &now,
|
||||
SnapshotID: "snapshot1",
|
||||
Progress: shared.DataMoveOperationProgress{
|
||||
TotalBytes: 10000,
|
||||
},
|
||||
},
|
||||
}
|
||||
duMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&dataUpload)
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedVolumeInfos := []*volume.VolumeInfo{
|
||||
{
|
||||
PVCName: "pvc1",
|
||||
PVCNamespace: "ns1",
|
||||
CompletionTimestamp: &now,
|
||||
SnapshotDataMovementInfo: &volume.SnapshotDataMovementInfo{
|
||||
SnapshotHandle: "snapshot1",
|
||||
Size: 10000,
|
||||
RetainedSnapshot: "vs1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
updateVolumeInfos(volumeInfos, unstructured.Unstructured{Object: duMap}, kuberesource.DataUploads, logger)
|
||||
|
||||
if len(expectedVolumeInfos) > 0 {
|
||||
require.Equal(t, expectedVolumeInfos[0].SnapshotDataMovementInfo, volumeInfos[0].SnapshotDataMovementInfo)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,8 +115,14 @@ func (d *DataUploadBuilder) CSISnapshot(cSISnapshot *velerov2alpha1api.CSISnapsh
|
|||
}
|
||||
|
||||
// StartTimestamp sets the DataUpload's StartTimestamp.
|
||||
func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBuilder {
|
||||
d.object.Status.StartTimestamp = startTime
|
||||
func (d *DataUploadBuilder) StartTimestamp(startTimestamp *metav1.Time) *DataUploadBuilder {
|
||||
d.object.Status.StartTimestamp = startTimestamp
|
||||
return d
|
||||
}
|
||||
|
||||
// CompletionTimestamp sets the DataUpload's StartTimestamp.
|
||||
func (d *DataUploadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataUploadBuilder {
|
||||
d.object.Status.CompletionTimestamp = completionTimestamp
|
||||
return d
|
||||
}
|
||||
|
||||
|
|
|
@ -80,6 +80,16 @@ func (b *PodVolumeBackupBuilder) SnapshotID(snapshotID string) *PodVolumeBackupB
|
|||
return b
|
||||
}
|
||||
|
||||
func (b *PodVolumeBackupBuilder) StartTimestamp(startTimestamp *metav1.Time) *PodVolumeBackupBuilder {
|
||||
b.object.Status.StartTimestamp = startTimestamp
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *PodVolumeBackupBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *PodVolumeBackupBuilder {
|
||||
b.object.Status.CompletionTimestamp = completionTimestamp
|
||||
return b
|
||||
}
|
||||
|
||||
// PodName sets the name of the pod associated with this PodVolumeBackup.
|
||||
func (b *PodVolumeBackupBuilder) PodName(name string) *PodVolumeBackupBuilder {
|
||||
b.object.Spec.Pod.Name = name
|
||||
|
|
|
@ -44,6 +44,7 @@ import (
|
|||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
|
@ -78,9 +79,15 @@ func (b *fakeBackupper) BackupWithResolvers(logger logrus.FieldLogger, backup *p
|
|||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (b *fakeBackupper) FinalizeBackup(logger logrus.FieldLogger, backup *pkgbackup.Request, inBackupFile io.Reader, outBackupFile io.Writer,
|
||||
func (b *fakeBackupper) FinalizeBackup(
|
||||
logger logrus.FieldLogger,
|
||||
backup *pkgbackup.Request,
|
||||
inBackupFile io.Reader,
|
||||
outBackupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
asyncBIAOperations []*itemoperation.BackupOperation) error {
|
||||
asyncBIAOperations []*itemoperation.BackupOperation,
|
||||
volumeInfos []*volume.VolumeInfo,
|
||||
) error {
|
||||
args := b.Called(logger, backup, inBackupFile, outBackupFile, backupItemActionResolver, asyncBIAOperations)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@ package controller
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -29,6 +31,7 @@ import (
|
|||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
|
@ -111,7 +114,11 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
|||
original := backup.DeepCopy()
|
||||
defer func() {
|
||||
switch backup.Status.Phase {
|
||||
case velerov1api.BackupPhaseCompleted, velerov1api.BackupPhasePartiallyFailed, velerov1api.BackupPhaseFailed, velerov1api.BackupPhaseFailedValidation:
|
||||
case
|
||||
velerov1api.BackupPhaseCompleted,
|
||||
velerov1api.BackupPhasePartiallyFailed,
|
||||
velerov1api.BackupPhaseFailed,
|
||||
velerov1api.BackupPhaseFailedValidation:
|
||||
r.backupTracker.Delete(backup.Namespace, backup.Name)
|
||||
}
|
||||
// Always attempt to Patch the backup object and status after each reconciliation.
|
||||
|
@ -150,8 +157,14 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
|||
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
|
||||
}
|
||||
var outBackupFile *os.File
|
||||
var volumeInfos []*volume.VolumeInfo
|
||||
if len(operations) > 0 {
|
||||
// Call itemBackupper.BackupItem for the list of items updated by async operations
|
||||
volumeInfos, err = backupStore.GetBackupVolumeInfos(backup.Name)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error getting backup VolumeInfos")
|
||||
return ctrl.Result{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
log.Info("Setting up finalized backup temp file")
|
||||
inBackupFile, err := downloadToTempFile(backup.Name, backupStore, log)
|
||||
if err != nil {
|
||||
|
@ -172,7 +185,17 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
|||
return ctrl.Result{}, errors.WithStack(err)
|
||||
}
|
||||
backupItemActionsResolver := framework.NewBackupItemActionResolverV2(actions)
|
||||
err = r.backupper.FinalizeBackup(log, backupRequest, inBackupFile, outBackupFile, backupItemActionsResolver, operations)
|
||||
|
||||
// Call itemBackupper.BackupItem for the list of items updated by async operations
|
||||
err = r.backupper.FinalizeBackup(
|
||||
log,
|
||||
backupRequest,
|
||||
inBackupFile,
|
||||
outBackupFile,
|
||||
backupItemActionsResolver,
|
||||
operations,
|
||||
volumeInfos,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error finalizing Backup")
|
||||
return ctrl.Result{}, errors.WithStack(err)
|
||||
|
@ -209,6 +232,24 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
|||
if err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error uploading backup final contents")
|
||||
}
|
||||
|
||||
// Update the backup's VolumeInfos
|
||||
backupVolumeInfoBuf := new(bytes.Buffer)
|
||||
gzw := gzip.NewWriter(backupVolumeInfoBuf)
|
||||
defer gzw.Close()
|
||||
|
||||
if err := json.NewEncoder(gzw).Encode(volumeInfos); err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error encoding restore results to JSON")
|
||||
}
|
||||
|
||||
if err := gzw.Close(); err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error closing gzip writer")
|
||||
}
|
||||
|
||||
err = backupStore.PutBackupVolumeInfos(backup.Name, backupVolumeInfoBuf)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "fail to upload backup VolumeInfos")
|
||||
}
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
|
|
@ -222,6 +222,8 @@ func TestBackupFinalizerReconcile(t *testing.T) {
|
|||
backupStore.On("GetBackupContents", mock.Anything).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil)
|
||||
backupStore.On("PutBackupContents", mock.Anything, mock.Anything).Return(nil)
|
||||
backupStore.On("PutBackupMetadata", mock.Anything, mock.Anything).Return(nil)
|
||||
backupStore.On("GetBackupVolumeInfos", mock.Anything).Return(nil, nil)
|
||||
backupStore.On("PutBackupVolumeInfos", mock.Anything, mock.Anything).Return(nil)
|
||||
pluginManager.On("GetBackupItemActionsV2").Return(nil, nil)
|
||||
backupper.On("FinalizeBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, framework.BackupItemActionResolverV2{}, mock.Anything).Return(nil)
|
||||
_, err := reconciler.Reconcile(context.TODO(), ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}})
|
||||
|
|
|
@ -35,4 +35,5 @@ var (
|
|||
VolumeSnapshots = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshots"}
|
||||
VolumeSnapshotContents = schema.GroupResource{Group: "snapshot.storage.k8s.io", Resource: "volumesnapshotcontents"}
|
||||
PriorityClasses = schema.GroupResource{Group: "scheduling.k8s.io", Resource: "priorityclasses"}
|
||||
DataUploads = schema.GroupResource{Group: "velero.io", Resource: "datauploads"}
|
||||
)
|
||||
|
|
|
@ -314,7 +314,7 @@ func (_m *BackupStore) GetRestoreItemOperations(name string) ([]*itemoperation.R
|
|||
return r0, r1
|
||||
}
|
||||
|
||||
// GetRestoreItemOperations provides a mock function with given fields: name
|
||||
// GetBackupVolumeInfos provides a mock function with given fields: name
|
||||
func (_m *BackupStore) GetBackupVolumeInfos(name string) ([]*volume.VolumeInfo, error) {
|
||||
ret := _m.Called(name)
|
||||
|
||||
|
@ -337,6 +337,20 @@ func (_m *BackupStore) GetBackupVolumeInfos(name string) ([]*volume.VolumeInfo,
|
|||
return r0, r1
|
||||
}
|
||||
|
||||
// PutBackupVolumeInfos provides a mock function with given fields: name, volumeInfo
|
||||
func (_m *BackupStore) PutBackupVolumeInfos(name string, volumeInfo io.Reader) error {
|
||||
ret := _m.Called(name, volumeInfo)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, io.Reader) error); ok {
|
||||
r0 = rf(name, volumeInfo)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetRestoreResults provides a mock function with given fields: name
|
||||
func (_m *BackupStore) GetRestoreResults(name string) (map[string]results.Result, error) {
|
||||
ret := _m.Called(name)
|
||||
|
|
|
@ -75,6 +75,7 @@ type BackupStore interface {
|
|||
GetCSIVolumeSnapshotContents(name string) ([]*snapshotv1api.VolumeSnapshotContent, error)
|
||||
GetCSIVolumeSnapshotClasses(name string) ([]*snapshotv1api.VolumeSnapshotClass, error)
|
||||
GetBackupVolumeInfos(name string) ([]*volume.VolumeInfo, error)
|
||||
PutBackupVolumeInfos(name string, volumeInfo io.Reader) error
|
||||
GetRestoreResults(name string) (map[string]results.Result, error)
|
||||
|
||||
// BackupExists checks if the backup metadata file exists in object storage.
|
||||
|
@ -516,6 +517,10 @@ func (s *objectBackupStore) GetBackupVolumeInfos(name string) ([]*volume.VolumeI
|
|||
return volumeInfos, nil
|
||||
}
|
||||
|
||||
func (s *objectBackupStore) PutBackupVolumeInfos(name string, volumeInfo io.Reader) error {
|
||||
return s.objectStore.PutObject(s.bucket, s.layout.getBackupVolumeInfoKey(name), volumeInfo)
|
||||
}
|
||||
|
||||
func (s *objectBackupStore) GetRestoreResults(name string) (map[string]results.Result, error) {
|
||||
results := make(map[string]results.Result)
|
||||
|
||||
|
|
|
@ -1203,6 +1203,51 @@ func TestGetRestoredResourceList(t *testing.T) {
|
|||
assert.EqualValues(t, list["pod"], res["pod"])
|
||||
}
|
||||
|
||||
func TestPutBackupVolumeInfos(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
prefix string
|
||||
expectedErr string
|
||||
expectedKeys []string
|
||||
}{
|
||||
{
|
||||
name: "normal case",
|
||||
expectedErr: "",
|
||||
expectedKeys: []string{
|
||||
"backups/backup-1/backup-1-volumeinfo.json.gz",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
harness := newObjectBackupStoreTestHarness("foo", tc.prefix)
|
||||
|
||||
volumeInfos := []*volume.VolumeInfo{
|
||||
{
|
||||
PVCName: "test",
|
||||
},
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
gzw := gzip.NewWriter(buf)
|
||||
defer gzw.Close()
|
||||
|
||||
require.NoError(t, json.NewEncoder(gzw).Encode(volumeInfos))
|
||||
bufferContent := buf.Bytes()
|
||||
|
||||
err := harness.PutBackupVolumeInfos("backup-1", buf)
|
||||
|
||||
velerotest.AssertErrorMatches(t, tc.expectedErr, err)
|
||||
assert.Len(t, harness.objectStore.Data[harness.bucket], len(tc.expectedKeys))
|
||||
for _, key := range tc.expectedKeys {
|
||||
assert.Contains(t, harness.objectStore.Data[harness.bucket], key)
|
||||
assert.Equal(t, harness.objectStore.Data[harness.bucket][key], bufferContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func encodeToBytes(obj runtime.Object) []byte {
|
||||
res, err := encode.Encode(obj, "json")
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue