Merge pull request #8603 from ywk253100/250113_pvb

[cherry-pick]Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue
pull/8608/head^2
Tiger Kaovilai 2025-01-13 14:22:17 +07:00 committed by GitHub
commit e92069247d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 200 additions and 42 deletions

View File

@ -0,0 +1 @@
Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue

View File

@ -35,10 +35,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
@ -315,7 +313,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
var podVolumeBackupper podvolume.Backupper
if kb.podVolumeBackupperFactory != nil {
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType)
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType)
if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err)
@ -745,6 +743,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()
// the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
@ -758,36 +757,19 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
}
}
// wait all PVBs of the item block pods to be processed
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}
podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}
pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue
for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
pvbs, err := itemBlock.itemBackupper.podVolumeBackupper.ListPodVolumeBackupsByPod(namespace, name)
if err != nil {
return errors.Wrapf(err, "failed to list PodVolumeBackups for pod %s/%s", namespace, name)
}
processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
for _, pvb := range pvbs {
pvbMap[pvb] = pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed
}
pvbMap[&pvbList.Items[i]] = processed
}
checkFunc := func(context.Context) (done bool, err error) {
@ -796,8 +778,8 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
if processed {
continue
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name)
if err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue

View File

@ -3945,7 +3945,7 @@ func TestBackupWithHooks(t *testing.T) {
type fakePodVolumeBackupperFactory struct{}
func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1.Backup, string) (podvolume.Backupper, error) {
func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, logrus.FieldLogger, *velerov1.Backup, string) (podvolume.Backupper, error) {
return &fakePodVolumeBackupper{}, nil
}
@ -3978,6 +3978,24 @@ func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogg
return b.pvbs
}
func (b *fakePodVolumeBackupper) GetPodVolumeBackup(namespace, name string) (*velerov1.PodVolumeBackup, error) {
for _, pvb := range b.pvbs {
if pvb.Namespace == namespace && pvb.Name == name {
return pvb, nil
}
}
return nil, nil
}
func (b *fakePodVolumeBackupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1.PodVolumeBackup, error) {
var pvbs []*velerov1.PodVolumeBackup
for _, pvb := range b.pvbs {
if pvb.Spec.Pod.Namespace == podNamespace && pvb.Spec.Pod.Name == podName {
pvbs = append(pvbs, pvb)
}
}
return pvbs, nil
}
// TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup,
// and ensures that the pod volume backupper is called, that the returned PodVolumeBackups
// are added to the Request object, and that when PVCs are backed up with PodVolume, the

View File

@ -48,6 +48,8 @@ type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error)
ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error)
}
type backupper struct {
@ -59,7 +61,10 @@ type backupper struct {
pvbInformer ctrlcache.Informer
handlerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
result []*velerov1api.PodVolumeBackup
// pvbIndexer holds all PVBs created by this backuper and is capable to search
// the PVBs based on specific properties quickly because of the embedded indexes.
// The statuses of the PVBs are got updated when Informer receives update events.
pvbIndexer cache.Indexer
}
type skippedPVC struct {
@ -101,8 +106,22 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) {
}
}
const indexNamePod = "POD"
func podIndexFunc(obj interface{}) ([]string, error) {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
if pvb == nil {
return nil, errors.New("PodVolumeBackup is nil")
}
return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil
}
func newBackupper(
ctx context.Context,
log logrus.FieldLogger,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
pvbInformer ctrlcache.Informer,
@ -118,13 +137,19 @@ func newBackupper(
uploaderType: uploaderType,
pvbInformer: pvbInformer,
wg: sync.WaitGroup{},
result: []*velerov1api.PodVolumeBackup{},
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}
b.handlerRegistration, _ = pvbInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pvb := obj.(*velerov1api.PodVolumeBackup)
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
log.Errorf("expected PodVolumeBackup, but got %T", obj)
return
}
if pvb.GetLabels()[velerov1api.BackupUIDLabel] != string(backup.UID) {
return
@ -135,7 +160,10 @@ func newBackupper(
return
}
b.result = append(b.result, pvb)
// the Indexer inserts PVB directly if the PVB to be updated doesn't exist
if err := b.pvbIndexer.Update(pvb); err != nil {
log.WithError(err).Errorf("failed to update PVB %s/%s in indexer", pvb.Namespace, pvb.Name)
}
b.wg.Done()
},
},
@ -318,6 +346,12 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
}
b.wg.Add(1)
if err := b.pvbIndexer.Add(volumeBackup); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to add PodVolumeBackup %s/%s to indexer", volumeBackup.Namespace, volumeBackup.Name))
continue
}
podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName)
}
@ -343,7 +377,12 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
case <-b.ctx.Done():
log.Error("timed out waiting for all PodVolumeBackups to complete")
case <-done:
for _, pvb := range b.result {
for _, obj := range b.pvbIndexer.List() {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
log.Errorf("expected PodVolumeBackup, but got %T", obj)
continue
}
podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
@ -353,6 +392,37 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
return podVolumeBackups
}
func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) {
obj, exist, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String())
if err != nil {
return nil, err
}
if !exist {
return nil, nil
}
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
return pvb, nil
}
func (b *backupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) {
objs, err := b.pvbIndexer.ByIndex(indexNamePod, cache.NewObjectName(podNamespace, podName).String())
if err != nil {
return nil, err
}
var pvbs []*velerov1api.PodVolumeBackup
for _, obj := range objs {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
pvbs = append(pvbs, pvb)
}
return pvbs, nil
}
func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {
for _, volumeName := range volumesToBackup {
log.WithError(err).Warnf("Skip pod volume %s", volumeName)

View File

@ -32,7 +32,7 @@ import (
// BackupperFactory can construct pod volumes backuppers.
type BackupperFactory interface {
// NewBackupper returns a pod volumes backupper for use during a single Velero backup.
NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error)
NewBackupper(context.Context, logrus.FieldLogger, *velerov1api.Backup, string) (Backupper, error)
}
func NewBackupperFactory(
@ -59,8 +59,8 @@ type backupperFactory struct {
log logrus.FieldLogger
}
func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
func (bf *backupperFactory) NewBackupper(ctx context.Context, log logrus.FieldLogger, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, log, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) {
return nil, errors.New("timed out waiting for caches to sync")

View File

@ -315,6 +315,7 @@ func TestBackupPodVolumes(t *testing.T) {
scheme := runtime.NewScheme()
velerov1api.AddToScheme(scheme)
corev1api.AddToScheme(scheme)
log := logrus.New()
tests := []struct {
name string
@ -594,7 +595,7 @@ func TestBackupPodVolumes(t *testing.T) {
backupObj.Spec.StorageLocation = test.bsl
factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, fakeCtrlClient, pvbInformer, velerotest.NewLogger())
bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType)
bp, err := factory.NewBackupper(ctx, log, backupObj, test.uploaderType)
require.NoError(t, err)
@ -619,6 +620,91 @@ func TestBackupPodVolumes(t *testing.T) {
}
}
func TestGetPodVolumeBackup(t *testing.T) {
backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}
obj := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
err := backupper.pvbIndexer.Add(obj)
require.NoError(t, err)
// not exist PVB
pvb, err := backupper.GetPodVolumeBackup("invalid-namespace", "invalid-name")
require.NoError(t, err)
assert.Nil(t, pvb)
// exist PVB
pvb, err = backupper.GetPodVolumeBackup("velero", "pvb")
require.NoError(t, err)
assert.NotNil(t, pvb)
}
func TestListPodVolumeBackupsByPodp(t *testing.T) {
backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}
obj1 := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb1",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
obj2 := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb2",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
err := backupper.pvbIndexer.Add(obj1)
require.NoError(t, err)
err = backupper.pvbIndexer.Add(obj2)
require.NoError(t, err)
// not exist PVBs
pvbs, err := backupper.ListPodVolumeBackupsByPod("invalid-namespace", "invalid-name")
require.NoError(t, err)
assert.Empty(t, pvbs)
// exist PVBs
pvbs, err = backupper.ListPodVolumeBackupsByPod("default", "pod")
require.NoError(t, err)
assert.Len(t, pvbs, 2)
}
type logHook struct {
entry *logrus.Entry
}
@ -636,6 +722,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) {
defer func() {
cancelFunc()
}()
log := logrus.New()
cases := []struct {
name string
ctx context.Context
@ -691,7 +778,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) {
logHook := &logHook{}
logger.Hooks.Add(logHook)
backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper := newBackupper(c.ctx, log, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper.wg.Add(1)
if c.statusToBeUpdated != nil {