Merge pull request #8679 from ywk253100/250211_waitgroup

Fix WaitGroup panic issue
pull/8287/merge
Wenkai Yin(尹文开) 2025-02-13 11:05:05 +08:00 committed by GitHub
commit c8e623864f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 52 additions and 25 deletions

View File

@ -0,0 +1 @@
Fix #8657: WaitGroup panic issue

View File

@ -780,7 +780,7 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
if processed { if processed {
continue continue
} }
updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name) updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackupByPodAndVolume(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Spec.Volume)
if err != nil { if err != nil {
allProcessed = false allProcessed = false
log.Infof("failed to get PVB: %v", err) log.Infof("failed to get PVB: %v", err)

View File

@ -3978,9 +3978,9 @@ func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogg
return b.pvbs return b.pvbs
} }
func (b *fakePodVolumeBackupper) GetPodVolumeBackup(namespace, name string) (*velerov1.PodVolumeBackup, error) { func (b *fakePodVolumeBackupper) GetPodVolumeBackupByPodAndVolume(podNamespace, podName, volume string) (*velerov1.PodVolumeBackup, error) {
for _, pvb := range b.pvbs { for _, pvb := range b.pvbs {
if pvb.Namespace == namespace && pvb.Name == name { if pvb.Spec.Pod.Namespace == podNamespace && pvb.Spec.Pod.Name == podName && pvb.Spec.Volume == volume {
return pvb, nil return pvb, nil
} }
} }

View File

@ -43,12 +43,17 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/util/kube"
) )
const (
indexNamePod = "POD"
pvbKeyPattern = "%s+%s+%s"
)
// Backupper can execute pod volume backups of volumes in a pod. // Backupper can execute pod volume backups of volumes in a pod.
type Backupper interface { type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod. // BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) GetPodVolumeBackupByPodAndVolume(podNamespace, podName, volume string) (*velerov1api.PodVolumeBackup, error)
ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error)
} }
@ -106,8 +111,6 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) {
} }
} }
const indexNamePod = "POD"
func podIndexFunc(obj any) ([]string, error) { func podIndexFunc(obj any) ([]string, error) {
pvb, ok := obj.(*velerov1api.PodVolumeBackup) pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok { if !ok {
@ -119,6 +122,16 @@ func podIndexFunc(obj any) ([]string, error) {
return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil
} }
// the PVB's name is auto-generated when creating the PVB, we cannot get the name or uid before creating it.
// So we cannot use namespace&name or uid as the key because we need to insert PVB into the indexer before creating it in API server
func podVolumeBackupKey(obj any) (string, error) {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return "", fmt.Errorf("expected PodVolumeBackup, but got %T", obj)
}
return fmt.Sprintf(pvbKeyPattern, pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Spec.Volume), nil
}
func newBackupper( func newBackupper(
ctx context.Context, ctx context.Context,
log logrus.FieldLogger, log logrus.FieldLogger,
@ -137,7 +150,7 @@ func newBackupper(
uploaderType: uploaderType, uploaderType: uploaderType,
pvbInformer: pvbInformer, pvbInformer: pvbInformer,
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ pvbIndexer: cache.NewIndexer(podVolumeBackupKey, cache.Indexers{
indexNamePod: podIndexFunc, indexNamePod: podIndexFunc,
}), }),
} }
@ -341,16 +354,22 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
} }
volumeBackup := newPodVolumeBackup(backup, pod, volume, repoIdentifier, b.uploaderType, pvc) volumeBackup := newPodVolumeBackup(backup, pod, volume, repoIdentifier, b.uploaderType, pvc)
if err := veleroclient.CreateRetryGenerateName(b.crClient, b.ctx, volumeBackup); err != nil { // the PVB must be added into the indexer before creating it in API server otherwise unexpected behavior may happen:
errs = append(errs, err) // the PVB may be handled very quickly by the controller and the informer handler will insert the PVB before "b.pvbIndexer.Add(volumeBackup)" runs,
continue // this causes the PVB inserted by "b.pvbIndexer.Add(volumeBackup)" overrides the PVB in the indexer while the PVB inserted by "b.pvbIndexer.Add(volumeBackup)"
} // contains empty "Status"
b.wg.Add(1)
if err := b.pvbIndexer.Add(volumeBackup); err != nil { if err := b.pvbIndexer.Add(volumeBackup); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to add PodVolumeBackup %s/%s to indexer", volumeBackup.Namespace, volumeBackup.Name)) errs = append(errs, errors.Wrapf(err, "failed to add PodVolumeBackup %s/%s to indexer", volumeBackup.Namespace, volumeBackup.Name))
continue continue
} }
// similar with above: the PVB may be handled very quickly by the controller and the informer handler will call "b.wg.Done()" before "b.wg.Add(1)" runs which causes panic
// see https://github.com/vmware-tanzu/velero/issues/8657
b.wg.Add(1)
if err := veleroclient.CreateRetryGenerateName(b.crClient, b.ctx, volumeBackup); err != nil {
b.wg.Done()
errs = append(errs, err)
continue
}
podVolumeBackups = append(podVolumeBackups, volumeBackup) podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName) pvcSummary.addBackedup(volumeName)
@ -392,8 +411,8 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
return podVolumeBackups return podVolumeBackups
} }
func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) { func (b *backupper) GetPodVolumeBackupByPodAndVolume(podNamespace, podName, volume string) (*velerov1api.PodVolumeBackup, error) {
obj, exist, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String()) obj, exist, err := b.pvbIndexer.GetByKey(fmt.Sprintf(pvbKeyPattern, podNamespace, podName, volume))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -620,37 +620,44 @@ func TestBackupPodVolumes(t *testing.T) {
} }
} }
func TestGetPodVolumeBackup(t *testing.T) { func TestGetPodVolumeBackupByPodAndVolume(t *testing.T) {
backupper := &backupper{ backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ pvbIndexer: cache.NewIndexer(podVolumeBackupKey, cache.Indexers{
indexNamePod: podIndexFunc, indexNamePod: podIndexFunc,
}), }),
} }
obj := &velerov1api.PodVolumeBackup{ obj := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb",
},
Spec: velerov1api.PodVolumeBackupSpec{ Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{ Pod: corev1api.ObjectReference{
Kind: "Pod", Kind: "Pod",
Namespace: "default", Namespace: "default",
Name: "pod", Name: "pod",
}, },
Volume: "volume",
}, },
} }
err := backupper.pvbIndexer.Add(obj) err := backupper.pvbIndexer.Add(obj)
require.NoError(t, err) require.NoError(t, err)
// not exist PVB // incorrect pod namespace
pvb, err := backupper.GetPodVolumeBackup("invalid-namespace", "invalid-name") pvb, err := backupper.GetPodVolumeBackupByPodAndVolume("invalid-namespace", "pod", "volume")
require.NoError(t, err) require.NoError(t, err)
assert.Nil(t, pvb) assert.Nil(t, pvb)
// exist PVB // incorrect pod name
pvb, err = backupper.GetPodVolumeBackup("velero", "pvb") pvb, err = backupper.GetPodVolumeBackupByPodAndVolume("default", "invalid-pod", "volume")
require.NoError(t, err)
assert.Nil(t, pvb)
// incorrect volume
pvb, err = backupper.GetPodVolumeBackupByPodAndVolume("default", "pod", "invalid-volume")
require.NoError(t, err)
assert.Nil(t, pvb)
// correct pod namespace, name and volume
pvb, err = backupper.GetPodVolumeBackupByPodAndVolume("default", "pod", "volume")
require.NoError(t, err) require.NoError(t, err)
assert.NotNil(t, pvb) assert.NotNil(t, pvb)
} }