Patch newly dynamically provisioned PV with volume info to restore custom setting of PV

Signed-off-by: allenxu404 <qix2@vmware.com>
pull/7504/head
allenxu404 2024-03-18 17:32:35 +08:00
parent 6ec1701b27
commit 67b5e82d49
10 changed files with 569 additions and 20 deletions

View File

@ -0,0 +1 @@
Patch newly dynamically provisioned PV with volume info to restore custom setting of PV

View File

@ -108,3 +108,9 @@ func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelect
}
return b
}
// Phase sets the PersistentVolume's phase.
func (b *PersistentVolumeBuilder) Phase(phase corev1api.PersistentVolumePhase) *PersistentVolumeBuilder {
b.object.Status.Phase = phase
return b
}

View File

@ -997,6 +997,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
newPluginManager,
backupStoreGetter,
s.metrics,
s.crClient,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer)
}

View File

@ -18,24 +18,36 @@ package controller
import (
"context"
"fmt"
"regexp"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"
internalVolume "github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/restore"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/results"
)
const (
PVPatchMaximumDuration = 10 * time.Minute
)
type restoreFinalizerReconciler struct {
client.Client
namespace string
@ -44,6 +56,7 @@ type restoreFinalizerReconciler struct {
backupStoreGetter persistence.ObjectBackupStoreGetter
metrics *metrics.ServerMetrics
clock clock.WithTickerAndDelayedExecution
crClient client.Client
}
func NewRestoreFinalizerReconciler(
@ -53,6 +66,7 @@ func NewRestoreFinalizerReconciler(
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
crClient client.Client,
) *restoreFinalizerReconciler {
return &restoreFinalizerReconciler{
Client: client,
@ -62,6 +76,7 @@ func NewRestoreFinalizerReconciler(
backupStoreGetter: backupStoreGetter,
metrics: metrics,
clock: &clock.RealClock{},
crClient: crClient,
}
}
@ -123,7 +138,27 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, errors.Wrap(err, "error getting backup store")
}
finalizerCtx := &finalizerContext{log: log}
volumeInfo, err := backupStore.GetBackupVolumeInfos(restore.Spec.BackupName)
if err != nil {
log.WithError(err).Errorf("error getting volumeInfo for backup %s", restore.Spec.BackupName)
return ctrl.Result{}, errors.Wrap(err, "error getting volumeInfo")
}
restoredResourceList, err := backupStore.GetRestoredResourceList(restore.Name)
if err != nil {
log.WithError(err).Error("error getting restoredResourceList")
return ctrl.Result{}, errors.Wrap(err, "error getting restoredResourceList")
}
restoredPVCList := getRestoredPVCFromRestoredResourceList(restoredResourceList)
finalizerCtx := &finalizerContext{
logger: log,
restore: restore,
crClient: r.crClient,
volumeInfo: volumeInfo,
restoredPVCList: restoredPVCList,
}
warnings, errs := finalizerCtx.execute()
warningCnt := len(warnings.Velero) + len(warnings.Cluster)
@ -200,14 +235,160 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R
// finalizerContext includes all the dependencies required by finalization tasks and
// a function execute() to orderly implement task logic.
type finalizerContext struct {
log logrus.FieldLogger
logger logrus.FieldLogger
restore *velerov1api.Restore
crClient client.Client
volumeInfo []*internalVolume.VolumeInfo
restoredPVCList map[string]struct{}
}
func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam)
warnings, errs := results.Result{}, results.Result{}
// implement finalization tasks
ctx.log.Debug("Starting running execute()")
pdpErrs := ctx.patchDynamicPVWithVolumeInfo()
errs.Merge(&pdpErrs)
return warnings, errs
}
// patchDynamicPV patches newly dynamically provisioned PV using volume info
// in order to restore custom settings that would otherwise be lost during dynamic PV recreation.
func (ctx *finalizerContext) patchDynamicPVWithVolumeInfo() (errs results.Result) {
ctx.logger.Info("patching newly dynamically provisioned PV starts")
var pvWaitGroup sync.WaitGroup
var resultLock sync.Mutex
maxConcurrency := 3
semaphore := make(chan struct{}, maxConcurrency)
for _, volumeItem := range ctx.volumeInfo {
if (volumeItem.BackupMethod == internalVolume.PodVolumeBackup || volumeItem.BackupMethod == internalVolume.CSISnapshot) && volumeItem.PVInfo != nil {
// Determine restored PVC namespace
restoredNamespace := volumeItem.PVCNamespace
if remapped, ok := ctx.restore.Spec.NamespaceMapping[restoredNamespace]; ok {
restoredNamespace = remapped
}
// Check if PVC was restored in previous phase
pvcKey := fmt.Sprintf("%s/%s", restoredNamespace, volumeItem.PVCName)
if _, restored := ctx.restoredPVCList[pvcKey]; !restored {
continue
}
pvWaitGroup.Add(1)
go func(volInfo internalVolume.VolumeInfo, restoredNamespace string) {
defer pvWaitGroup.Done()
semaphore <- struct{}{}
log := ctx.logger.WithField("PVC", volInfo.PVCName).WithField("PVCNamespace", restoredNamespace)
log.Debug("patching dynamic PV is in progress")
err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, PVPatchMaximumDuration, true, func(context.Context) (bool, error) {
// wait for PVC to be bound
pvc := &v1.PersistentVolumeClaim{}
err := ctx.crClient.Get(context.Background(), client.ObjectKey{Name: volInfo.PVCName, Namespace: restoredNamespace}, pvc)
if apierrors.IsNotFound(err) {
log.Debug("error not finding PVC")
return false, nil
}
if err != nil {
return false, err
}
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
log.Debugf("PVC: %s not ready", pvc.Name)
return false, nil
}
// wait for PV to be bound
pvName := pvc.Spec.VolumeName
pv := &v1.PersistentVolume{}
err = ctx.crClient.Get(context.Background(), client.ObjectKey{Name: pvName}, pv)
if apierrors.IsNotFound(err) {
log.Debugf("error not finding PV: %s", pvName)
return false, nil
}
if err != nil {
return false, err
}
if pv.Spec.ClaimRef == nil || pv.Status.Phase != v1.VolumeBound {
log.Debugf("PV: %s not ready", pvName)
return false, nil
}
// validate PV
if pv.Spec.ClaimRef.Name != pvc.Name || pv.Spec.ClaimRef.Namespace != restoredNamespace {
return false, fmt.Errorf("PV was bound by unexpected PVC, unexpected PVC: %s/%s, expected PVC: %s/%s",
pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name, restoredNamespace, pvc.Name)
}
// patch PV's reclaim policy and label using the corresponding data stored in volume info
if needPatch(pv, volInfo.PVInfo) {
updatedPV := pv.DeepCopy()
updatedPV.Labels = volInfo.PVInfo.Labels
updatedPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimPolicy(volInfo.PVInfo.ReclaimPolicy)
if err := kubeutil.PatchResource(pv, updatedPV, ctx.crClient); err != nil {
return false, err
}
log.Infof("newly dynamically provisioned PV:%s has been patched using volume info", pvName)
}
return true, nil
})
if err != nil {
err = fmt.Errorf("fail to patch dynamic PV, err: %s, PVC: %s, PV: %s", err, volInfo.PVCName, volInfo.PVName)
ctx.logger.WithError(errors.WithStack((err))).Error("err patching dynamic PV using volume info")
resultLock.Lock()
defer resultLock.Unlock()
errs.Add(restoredNamespace, err)
}
<-semaphore
}(*volumeItem, restoredNamespace)
}
}
pvWaitGroup.Wait()
ctx.logger.Info("patching newly dynamically provisioned PV ends")
return errs
}
func getRestoredPVCFromRestoredResourceList(restoredResourceList map[string][]string) map[string]struct{} {
pvcKey := "v1/PersistentVolumeClaim"
pvcList := make(map[string]struct{})
for _, pvc := range restoredResourceList[pvcKey] {
// the format of pvc string in restoredResourceList is like: "namespace/pvcName(status)"
// extract the substring before "(created)" if the status in rightmost Parenthesis is "created"
r := regexp.MustCompile(`\(([^)]+)\)`)
matches := r.FindAllStringSubmatch(pvc, -1)
if len(matches) > 0 && matches[len(matches)-1][1] == restore.ItemRestoreResultCreated {
pvcList[pvc[:len(pvc)-len("(created)")]] = struct{}{}
}
}
return pvcList
}
func needPatch(newPV *v1.PersistentVolume, pvInfo *internalVolume.PVInfo) bool {
if newPV.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimPolicy(pvInfo.ReclaimPolicy) {
return true
}
newPVLabels, pvLabels := newPV.Labels, pvInfo.Labels
for k, v := range pvLabels {
if _, ok := newPVLabels[k]; !ok {
return true
}
if newPVLabels[k] != v {
return true
}
}
return false
}

View File

@ -31,6 +31,10 @@ import (
"github.com/stretchr/testify/mock"
corev1api "k8s.io/api/core/v1"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/metrics"
@ -130,24 +134,24 @@ func TestRestoreFinalizerReconcile(t *testing.T) {
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeSingleObjectBackupStoreGetter(backupStore),
metrics.NewServerMetrics(),
fakeClient,
)
r.clock = testclocks.NewFakeClock(now)
if test.restore != nil && test.restore.Namespace == velerov1api.DefaultNamespace {
require.NoError(t, r.Client.Create(context.Background(), test.restore))
backupStore.On("GetRestoredResourceList", test.restore.Name).Return(map[string][]string{}, nil)
}
if test.backup != nil {
assert.NoError(t, r.Client.Create(context.Background(), test.backup))
backupStore.On("GetBackupVolumeInfos", test.backup.Name).Return(nil, nil)
pluginManager.On("GetRestoreItemActionsV2").Return(nil, nil)
pluginManager.On("CleanupClients")
}
if test.location != nil {
require.NoError(t, r.Client.Create(context.Background(), test.location))
}
if test.restore != nil {
pluginManager.On("GetRestoreItemActionsV2").Return(nil, nil)
pluginManager.On("CleanupClients")
}
_, err = r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: test.restore.Namespace,
Name: test.restore.Name,
@ -192,6 +196,7 @@ func TestUpdateResult(t *testing.T) {
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeSingleObjectBackupStoreGetter(backupStore),
metrics.NewServerMetrics(),
fakeClient,
)
restore := builder.ForRestore(velerov1api.DefaultNamespace, "restore-1").Result()
res := map[string]results.Result{"warnings": {}, "errors": {}}
@ -202,3 +207,287 @@ func TestUpdateResult(t *testing.T) {
err := r.updateResults(backupStore, restore, &results.Result{}, &results.Result{})
require.NoError(t, err)
}
func TestPatchDynamicPVWithVolumeInfo(t *testing.T) {
tests := []struct {
name string
volumeInfo []*volume.VolumeInfo
restoredPVCNames map[string]struct{}
restore *velerov1api.Restore
restoredPVC []*corev1api.PersistentVolumeClaim
restoredPV []*corev1api.PersistentVolume
expectedPatch map[string]volume.PVInfo
expectedErrNum int
}{
{
name: "no applicable volumeInfo",
volumeInfo: []*volume.VolumeInfo{{BackupMethod: "VeleroNativeSnapshot", PVCName: "pvc1"}},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
expectedPatch: nil,
expectedErrNum: 0,
},
{
name: "no restored PVC",
volumeInfo: []*volume.VolumeInfo{{BackupMethod: "PodVolumeBackup", PVCName: "pvc1"}},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
expectedPatch: nil,
expectedErrNum: 0,
},
{
name: "no applicable pv patch",
volumeInfo: []*volume.VolumeInfo{{
BackupMethod: "PodVolumeBackup",
PVCName: "pvc1",
PVName: "pv1",
PVCNamespace: "ns1",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
}},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
restoredPVCNames: map[string]struct{}{"ns1/pvc1": {}},
restoredPV: []*corev1api.PersistentVolume{
builder.ForPersistentVolume("new-pv1").ObjectMeta(builder.WithLabels("label1", "label1-val")).ClaimRef("ns1", "pvc1").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimDelete).Result()},
restoredPVC: []*corev1api.PersistentVolumeClaim{
builder.ForPersistentVolumeClaim("ns1", "pvc1").VolumeName("new-pv1").Phase(corev1api.ClaimBound).Result(),
},
expectedPatch: nil,
expectedErrNum: 0,
},
{
name: "an applicable pv patch",
volumeInfo: []*volume.VolumeInfo{{
BackupMethod: "PodVolumeBackup",
PVCName: "pvc1",
PVName: "pv1",
PVCNamespace: "ns1",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
}},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
restoredPVCNames: map[string]struct{}{"ns1/pvc1": {}},
restoredPV: []*corev1api.PersistentVolume{
builder.ForPersistentVolume("new-pv1").ClaimRef("ns1", "pvc1").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result()},
restoredPVC: []*corev1api.PersistentVolumeClaim{
builder.ForPersistentVolumeClaim("ns1", "pvc1").VolumeName("new-pv1").Phase(corev1api.ClaimBound).Result(),
},
expectedPatch: map[string]volume.PVInfo{"new-pv1": {
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
}},
expectedErrNum: 0,
},
{
name: "a mapped namespace restore",
volumeInfo: []*volume.VolumeInfo{{
BackupMethod: "PodVolumeBackup",
PVCName: "pvc1",
PVName: "pv1",
PVCNamespace: "ns2",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
}},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").NamespaceMappings("ns2", "ns1").Result(),
restoredPVCNames: map[string]struct{}{"ns1/pvc1": {}},
restoredPV: []*corev1api.PersistentVolume{
builder.ForPersistentVolume("new-pv1").ClaimRef("ns1", "pvc1").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result()},
restoredPVC: []*corev1api.PersistentVolumeClaim{
builder.ForPersistentVolumeClaim("ns1", "pvc1").VolumeName("new-pv1").Phase(corev1api.ClaimBound).Result(),
},
expectedPatch: map[string]volume.PVInfo{"new-pv1": {
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
}},
expectedErrNum: 0,
},
{
name: "two applicable pv patches",
volumeInfo: []*volume.VolumeInfo{{
BackupMethod: "PodVolumeBackup",
PVCName: "pvc1",
PVName: "pv1",
PVCNamespace: "ns1",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
},
{
BackupMethod: "CSISnapshot",
PVCName: "pvc2",
PVName: "pv2",
PVCNamespace: "ns2",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label2": "label2-val"},
},
},
},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
restoredPVCNames: map[string]struct{}{
"ns1/pvc1": {},
"ns2/pvc2": {},
},
restoredPV: []*corev1api.PersistentVolume{
builder.ForPersistentVolume("new-pv1").ClaimRef("ns1", "pvc1").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result(),
builder.ForPersistentVolume("new-pv2").ClaimRef("ns2", "pvc2").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result(),
},
restoredPVC: []*corev1api.PersistentVolumeClaim{
builder.ForPersistentVolumeClaim("ns1", "pvc1").VolumeName("new-pv1").Phase(corev1api.ClaimBound).Result(),
builder.ForPersistentVolumeClaim("ns2", "pvc2").VolumeName("new-pv2").Phase(corev1api.ClaimBound).Result(),
},
expectedPatch: map[string]volume.PVInfo{
"new-pv1": {
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
"new-pv2": {
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label2": "label2-val"},
},
},
expectedErrNum: 0,
},
{
name: "an applicable pv patch with bound error",
volumeInfo: []*volume.VolumeInfo{{
BackupMethod: "PodVolumeBackup",
PVCName: "pvc1",
PVName: "pv1",
PVCNamespace: "ns1",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
}},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
restoredPVCNames: map[string]struct{}{"ns1/pvc1": {}},
restoredPV: []*corev1api.PersistentVolume{
builder.ForPersistentVolume("new-pv1").ClaimRef("ns2", "pvc2").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result()},
restoredPVC: []*corev1api.PersistentVolumeClaim{
builder.ForPersistentVolumeClaim("ns1", "pvc1").VolumeName("new-pv1").Phase(corev1api.ClaimBound).Result(),
},
expectedErrNum: 1,
},
{
name: "two applicable pv patches with an error",
volumeInfo: []*volume.VolumeInfo{{
BackupMethod: "PodVolumeBackup",
PVCName: "pvc1",
PVName: "pv1",
PVCNamespace: "ns1",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
},
{
BackupMethod: "CSISnapshot",
PVCName: "pvc2",
PVName: "pv2",
PVCNamespace: "ns2",
PVInfo: &volume.PVInfo{
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label2": "label2-val"},
},
},
},
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore").Result(),
restoredPVCNames: map[string]struct{}{
"ns1/pvc1": {},
"ns2/pvc2": {},
},
restoredPV: []*corev1api.PersistentVolume{
builder.ForPersistentVolume("new-pv1").ClaimRef("ns1", "pvc1").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result(),
builder.ForPersistentVolume("new-pv2").ClaimRef("ns3", "pvc3").Phase(corev1api.VolumeBound).ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).Result(),
},
restoredPVC: []*corev1api.PersistentVolumeClaim{
builder.ForPersistentVolumeClaim("ns1", "pvc1").VolumeName("new-pv1").Phase(corev1api.ClaimBound).Result(),
builder.ForPersistentVolumeClaim("ns2", "pvc2").VolumeName("new-pv2").Phase(corev1api.ClaimBound).Result(),
},
expectedPatch: map[string]volume.PVInfo{
"new-pv1": {
ReclaimPolicy: string(corev1api.PersistentVolumeReclaimDelete),
Labels: map[string]string{"label1": "label1-val"},
},
},
expectedErrNum: 1,
},
}
for _, tc := range tests {
var (
fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build()
logger = velerotest.NewLogger()
)
ctx := &finalizerContext{
logger: logger,
crClient: fakeClient,
restore: tc.restore,
restoredPVCList: tc.restoredPVCNames,
volumeInfo: tc.volumeInfo,
}
for _, pv := range tc.restoredPV {
require.NoError(t, ctx.crClient.Create(context.Background(), pv))
}
for _, pvc := range tc.restoredPVC {
require.NoError(t, ctx.crClient.Create(context.Background(), pvc))
}
errs := ctx.patchDynamicPVWithVolumeInfo()
if tc.expectedErrNum > 0 {
assert.Equal(t, tc.expectedErrNum, len(errs.Namespaces))
}
for pvName, expectedPVInfo := range tc.expectedPatch {
pv := &corev1api.PersistentVolume{}
err := ctx.crClient.Get(context.Background(), crclient.ObjectKey{Name: pvName}, pv)
assert.NoError(t, err)
assert.Equal(t, expectedPVInfo.ReclaimPolicy, string(pv.Spec.PersistentVolumeReclaimPolicy))
assert.Equal(t, expectedPVInfo.Labels, pv.Labels)
}
}
}
func TestGetRestoredPVCFromRestoredResourceList(t *testing.T) {
// test empty list
restoredResourceList := map[string][]string{}
actual := getRestoredPVCFromRestoredResourceList(restoredResourceList)
assert.Empty(t, actual)
// test no match
restoredResourceList = map[string][]string{
"v1/PersistentVolumeClaim": {
"namespace1/pvc1(updated)",
},
"v1/PersistentVolume": {
"namespace1/pv(created)",
},
}
actual = getRestoredPVCFromRestoredResourceList(restoredResourceList)
assert.Empty(t, actual)
// test matches
restoredResourceList = map[string][]string{
"v1/PersistentVolumeClaim": {
"namespace1/pvc1(created)",
"namespace2/pvc2(updated)",
"namespace3/pvc(3)(created)",
},
}
expected := map[string]struct{}{
"namespace1/pvc1": {},
"namespace3/pvc(3)": {},
}
actual = getRestoredPVCFromRestoredResourceList(restoredResourceList)
assert.Equal(t, expected, actual)
}

View File

@ -361,6 +361,29 @@ func (_m *BackupStore) GetRestoreResults(name string) (map[string]results.Result
return r0, r1
}
// GetRestoredResourceList provides a mock function with given fields: name
func (_m *BackupStore) GetRestoredResourceList(name string) (map[string][]string, error) {
ret := _m.Called(name)
r0 := make(map[string][]string)
if rf, ok := ret.Get(0).(func(string) map[string][]string); ok {
r0 = rf(name)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string][]string)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(name)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// IsValid provides a mock function with given fields:
func (_m *BackupStore) IsValid() error {
ret := _m.Called()

View File

@ -89,6 +89,7 @@ type BackupStore interface {
PutRestoreItemOperations(restore string, restoreItemOperations io.Reader) error
GetRestoreItemOperations(name string) ([]*itemoperation.RestoreOperation, error)
DeleteRestore(name string) error
GetRestoredResourceList(name string) (map[string][]string, error)
GetDownloadURL(target velerov1api.DownloadTarget) (string, error)
}
@ -638,6 +639,25 @@ func (s *objectBackupStore) GetDownloadURL(target velerov1api.DownloadTarget) (s
}
}
func (s *objectBackupStore) GetRestoredResourceList(name string) (map[string][]string, error) {
list := make(map[string][]string)
res, err := tryGet(s.objectStore, s.bucket, s.layout.getRestoreResourceListKey(name))
if err != nil {
return list, err
}
if res == nil {
return list, nil
}
defer res.Close()
if err := decode(res, &list); err != nil {
return list, err
}
return list, nil
}
func seekToBeginning(r io.Reader) error {
seeker, ok := r.(io.Seeker)
if !ok {

View File

@ -1177,6 +1177,34 @@ func TestGetRestoreResults(t *testing.T) {
assert.EqualValues(t, contents["errors"], res["errors"])
}
func TestGetRestoredResourceList(t *testing.T) {
harness := newObjectBackupStoreTestHarness("test-bucket", "")
// file not found should not error
_, err := harness.GetRestoredResourceList("test-restore")
assert.NoError(t, err)
// file containing invalid data should error
harness.objectStore.PutObject(harness.bucket, "restores/test-restore/restore-test-restore-resource-list.json.gz", newStringReadSeeker("foo"))
_, err = harness.GetRestoredResourceList("test-restore")
assert.NotNil(t, err)
// file containing gzipped json data should return correctly
list := map[string][]string{
"pod": {"test-ns/pod1(created)", "test-ns/pod2(skipped)"},
}
obj := new(bytes.Buffer)
gzw := gzip.NewWriter(obj)
require.NoError(t, json.NewEncoder(gzw).Encode(list))
require.NoError(t, gzw.Close())
require.NoError(t, harness.objectStore.PutObject(harness.bucket, "restores/test-restore/restore-test-restore-resource-list.json.gz", obj))
res, err := harness.GetRestoredResourceList("test-restore")
assert.NoError(t, err)
assert.EqualValues(t, list["pod"], res["pod"])
}
func encodeToBytes(obj runtime.Object) []byte {
res, err := encode.Encode(obj, "json")
if err != nil {

View File

@ -33,10 +33,10 @@ import (
)
const (
itemRestoreResultCreated = "created"
itemRestoreResultUpdated = "updated"
itemRestoreResultFailed = "failed"
itemRestoreResultSkipped = "skipped"
ItemRestoreResultCreated = "created"
ItemRestoreResultUpdated = "updated"
ItemRestoreResultFailed = "failed"
ItemRestoreResultSkipped = "skipped"
)
type itemKey struct {

View File

@ -552,7 +552,7 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
var createdOrUpdatedCRDs bool
for _, restoredItem := range ctx.restoredItems {
if restoredItem.action == itemRestoreResultCreated || restoredItem.action == itemRestoreResultUpdated {
if restoredItem.action == ItemRestoreResultCreated || restoredItem.action == ItemRestoreResultUpdated {
createdOrUpdatedCRDs = true
break
}
@ -757,7 +757,7 @@ func (ctx *restoreContext) processSelectedResource(
namespace: ns.Namespace,
name: ns.Name,
}
ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: true}
ctx.restoredItems[itemKey] = restoredItemStatus{action: ItemRestoreResultCreated, itemExists: true}
}
// Keep track of namespaces that we know exist so we don't
@ -1156,7 +1156,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
namespace: nsToEnsure.Namespace,
name: nsToEnsure.Name,
}
ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: true}
ctx.restoredItems[itemKey] = restoredItemStatus{action: ItemRestoreResultCreated, itemExists: true}
}
} else {
if boolptr.IsSetToFalse(ctx.restore.Spec.IncludeClusterResources) {
@ -1201,12 +1201,12 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
// no action specified, and no warnings and errors
if errs.IsEmpty() && warnings.IsEmpty() {
itemStatus.action = itemRestoreResultSkipped
itemStatus.action = ItemRestoreResultSkipped
ctx.restoredItems[itemKey] = itemStatus
return
}
// others are all failed
itemStatus.action = itemRestoreResultFailed
itemStatus.action = ItemRestoreResultFailed
ctx.restoredItems[itemKey] = itemStatus
}()
@ -1529,7 +1529,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
createdObj, restoreErr = resourceClient.Create(obj)
if restoreErr == nil {
itemExists = true
ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: itemExists}
ctx.restoredItems[itemKey] = restoredItemStatus{action: ItemRestoreResultCreated, itemExists: itemExists}
}
}
@ -1610,7 +1610,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
errs.Merge(&errsFromUpdate)
}
} else {
itemStatus.action = itemRestoreResultUpdated
itemStatus.action = ItemRestoreResultUpdated
ctx.restoredItems[itemKey] = itemStatus
ctx.log.Infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
}
@ -1630,7 +1630,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// processing update as existingResourcePolicy
warningsFromUpdateRP, errsFromUpdateRP := ctx.processUpdateResourcePolicy(fromCluster, fromClusterWithLabels, obj, namespace, resourceClient)
if warningsFromUpdateRP.IsEmpty() && errsFromUpdateRP.IsEmpty() {
itemStatus.action = itemRestoreResultUpdated
itemStatus.action = ItemRestoreResultUpdated
ctx.restoredItems[itemKey] = itemStatus
}
warnings.Merge(&warningsFromUpdateRP)