Improve the concurrency for PVBs in different pods

Improve the concurrency for PVBs in different pods

Fixes #6676

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
pull/7571/head
Wenkai Yin(尹文开) 2024-03-25 11:46:09 +08:00
parent d640cc16ab
commit 8d10b68eda
8 changed files with 160 additions and 150 deletions

View File

@ -0,0 +1 @@
Improve the concurrency for PVBs in different pods

View File

@ -419,6 +419,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
}
}
processedPVBs := itemBackupper.podVolumeBackupper.WaitAllPodVolumesProcessed(log)
backupRequest.PodVolumeBackups = append(backupRequest.PodVolumeBackups, processedPVBs...)
// do a final update on progress since we may have just added some CRDs and may not have updated
// for the last few processed items.
updated = backupRequest.Backup.DeepCopy()

View File

@ -3054,7 +3054,9 @@ func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1.
return &fakePodVolumeBackupper{}, nil
}
type fakePodVolumeBackupper struct{}
type fakePodVolumeBackupper struct {
pvbs []*velerov1.PodVolumeBackup
}
// BackupPodVolumes returns one pod volume backup per entry in volumes, with namespace "velero"
// and name "pvb-<pod-namespace>-<pod-name>-<volume-name>".
@ -3072,9 +3074,15 @@ func (b *fakePodVolumeBackupper) BackupPodVolumes(backup *velerov1.Backup, pod *
res = append(res, pvb)
}
b.pvbs = res
return res, pvcSummary, nil
}
func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1.PodVolumeBackup {
return b.pvbs
}
// TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup,
// and ensures that the pod volume backupper is called, that the returned PodVolumeBackups
// are added to the Request object, and that when PVCs are backed up with PodVolume, the
@ -3289,7 +3297,7 @@ func newHarness(t *testing.T) *harness {
// unsupported
podCommandExecutor: nil,
podVolumeBackupperFactory: nil,
podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory),
podVolumeTimeout: 0,
},
log: log,

View File

@ -270,7 +270,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
// even if there are errors.
podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes)
ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...)
backupErrs = append(backupErrs, errs...)
// Mark the volumes that has been processed by pod volume backup as Taken in the tracker.

View File

@ -45,17 +45,19 @@ import (
type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
}
type backupper struct {
ctx context.Context
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
crClient ctrlclient.Client
uploaderType string
results map[string]chan *velerov1api.PodVolumeBackup
resultsLock sync.Mutex
ctx context.Context
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
crClient ctrlclient.Client
uploaderType string
pvbInformer ctrlcache.Informer
handlerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
result []*velerov1api.PodVolumeBackup
}
type skippedPVC struct {
@ -105,7 +107,6 @@ func newBackupper(
crClient ctrlclient.Client,
uploaderType string,
backup *velerov1api.Backup,
log logrus.FieldLogger,
) *backupper {
b := &backupper{
ctx: ctx,
@ -113,11 +114,12 @@ func newBackupper(
repoEnsurer: repoEnsurer,
crClient: crClient,
uploaderType: uploaderType,
results: make(map[string]chan *velerov1api.PodVolumeBackup),
pvbInformer: pvbInformer,
wg: sync.WaitGroup{},
result: []*velerov1api.PodVolumeBackup{},
}
_, _ = pvbInformer.AddEventHandler(
b.handlerRegistration, _ = pvbInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pvb := obj.(*velerov1api.PodVolumeBackup)
@ -126,17 +128,13 @@ func newBackupper(
return
}
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
b.resultsLock.Lock()
defer b.resultsLock.Unlock()
resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)]
if !ok {
log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name)
return
}
resChan <- pvb
if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted &&
pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed {
return
}
b.result = append(b.result, pvb)
b.wg.Done()
},
},
)
@ -217,12 +215,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
b.repoLocker.Lock(repo.Name)
defer b.repoLocker.Unlock(repo.Name)
resultsChan := make(chan *velerov1api.PodVolumeBackup)
b.resultsLock.Lock()
b.results[resultsKey(pod.Namespace, pod.Name)] = resultsChan
b.resultsLock.Unlock()
var (
podVolumeBackups []*velerov1api.PodVolumeBackup
mountedPodVolumes = sets.Set[string]{}
@ -243,7 +235,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
repoIdentifier = repo.Spec.ResticIdentifier
}
var numVolumeSnapshots int
for _, volumeName := range volumesToBackup {
volume, ok := podVolumes[volumeName]
if !ok {
@ -305,32 +296,40 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
errs = append(errs, err)
continue
}
b.wg.Add(1)
podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName)
numVolumeSnapshots++
}
ForEachVolume:
for i, count := 0, numVolumeSnapshots; i < count; i++ {
select {
case <-b.ctx.Done():
errs = append(errs, errors.New("timed out waiting for all PodVolumeBackups to complete"))
break ForEachVolume
case res := <-resultsChan:
switch res.Status.Phase {
case velerov1api.PodVolumeBackupPhaseCompleted:
podVolumeBackups = append(podVolumeBackups, res)
case velerov1api.PodVolumeBackupPhaseFailed:
errs = append(errs, errors.Errorf("pod volume backup failed: %s", res.Status.Message))
podVolumeBackups = append(podVolumeBackups, res)
return podVolumeBackups, pvcSummary, errs
}
func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup {
defer func() {
if err := b.pvbInformer.RemoveEventHandler(b.handlerRegistration); err != nil {
log.Debugf("failed to remove the event handler for PVB: %v", err)
}
}()
done := make(chan struct{})
go func() {
defer close(done)
b.wg.Wait()
}()
var podVolumeBackups []*velerov1api.PodVolumeBackup
select {
case <-b.ctx.Done():
log.Error("timed out waiting for all PodVolumeBackups to complete")
case <-done:
for _, pvb := range b.result {
podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
}
}
}
b.resultsLock.Lock()
delete(b.results, resultsKey(pod.Namespace, pod.Name))
b.resultsLock.Unlock()
return podVolumeBackups, pvcSummary, errs
return podVolumeBackups
}
func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {

View File

@ -60,7 +60,7 @@ type backupperFactory struct {
}
func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup, bf.log)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) {
return nil, errors.New("timed out waiting for caches to sync")

View File

@ -29,6 +29,7 @@ import (
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
clientTesting "k8s.io/client-go/testing"
@ -307,15 +308,8 @@ func TestBackupPodVolumes(t *testing.T) {
velerov1api.AddToScheme(scheme)
corev1api.AddToScheme(scheme)
ctxWithCancel, cancel := context.WithCancel(context.Background())
defer cancel()
failedPVB := createPVBObj(true, false, 1, "")
completedPVB := createPVBObj(false, false, 1, "")
tests := []struct {
name string
ctx context.Context
bsl string
uploaderType string
volumes []string
@ -325,7 +319,6 @@ func TestBackupPodVolumes(t *testing.T) {
veleroClientObj []runtime.Object
veleroReactors []reactor
runtimeScheme *runtime.Scheme
retPVBs []*velerov1api.PodVolumeBackup
pvbs []*velerov1api.PodVolumeBackup
errs []string
}{
@ -493,87 +486,11 @@ func TestBackupPodVolumes(t *testing.T) {
uploaderType: "kopia",
bsl: "fake-bsl",
},
{
name: "context canceled",
ctx: ctxWithCancel,
volumes: []string{
"fake-volume-1",
},
sourcePod: createPodObj(true, true, true, 1),
kubeClientObj: []runtime.Object{
createNodeAgentPodObj(true),
createPVCObj(1),
createPVObj(1, false),
},
ctlClientObj: []runtime.Object{
createBackupRepoObj(),
},
runtimeScheme: scheme,
uploaderType: "kopia",
bsl: "fake-bsl",
errs: []string{
"timed out waiting for all PodVolumeBackups to complete",
},
},
{
name: "return failed pvbs",
volumes: []string{
"fake-volume-1",
},
sourcePod: createPodObj(true, true, true, 1),
kubeClientObj: []runtime.Object{
createNodeAgentPodObj(true),
createPVCObj(1),
createPVObj(1, false),
},
ctlClientObj: []runtime.Object{
createBackupRepoObj(),
},
runtimeScheme: scheme,
uploaderType: "kopia",
bsl: "fake-bsl",
retPVBs: []*velerov1api.PodVolumeBackup{
failedPVB,
},
pvbs: []*velerov1api.PodVolumeBackup{
failedPVB,
},
errs: []string{
"pod volume backup failed: fake-message",
},
},
{
name: "return completed pvbs",
volumes: []string{
"fake-volume-1",
},
sourcePod: createPodObj(true, true, true, 1),
kubeClientObj: []runtime.Object{
createNodeAgentPodObj(true),
createPVCObj(1),
createPVObj(1, false),
},
ctlClientObj: []runtime.Object{
createBackupRepoObj(),
},
runtimeScheme: scheme,
uploaderType: "kopia",
bsl: "fake-bsl",
retPVBs: []*velerov1api.PodVolumeBackup{
completedPVB,
},
pvbs: []*velerov1api.PodVolumeBackup{
completedPVB,
},
},
}
// TODO add more verification around PVCBackupSummary returned by "BackupPodVolumes"
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
if test.ctx != nil {
ctx = test.ctx
}
fakeClientBuilder := ctrlfake.NewClientBuilder()
if test.runtimeScheme != nil {
@ -606,18 +523,6 @@ func TestBackupPodVolumes(t *testing.T) {
require.NoError(t, err)
go func() {
if test.ctx != nil {
time.Sleep(time.Second)
cancel()
} else if test.retPVBs != nil {
time.Sleep(time.Second)
for _, pvb := range test.retPVBs {
bp.(*backupper).results[resultsKey(test.sourcePod.Namespace, test.sourcePod.Name)] <- pvb
}
}
}()
pvbs, _, errs := bp.BackupPodVolumes(backupObj, test.sourcePod, test.volumes, nil, velerotest.NewLogger())
if errs == nil {
@ -633,6 +538,101 @@ func TestBackupPodVolumes(t *testing.T) {
}
}
type logHook struct {
entry *logrus.Entry
}
func (l *logHook) Levels() []logrus.Level {
return []logrus.Level{logrus.ErrorLevel}
}
func (l *logHook) Fire(entry *logrus.Entry) error {
l.entry = entry
return nil
}
func TestWaitAllPodVolumesProcessed(t *testing.T) {
timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
cases := []struct {
name string
ctx context.Context
statusToBeUpdated *velerov1api.PodVolumeBackupStatus
expectedErr string
expectedPVBPhase velerov1api.PodVolumeBackupPhase
}{
{
name: "context canceled",
ctx: timeoutCtx,
expectedErr: "timed out waiting for all PodVolumeBackups to complete",
},
{
name: "failed pvbs",
ctx: context.Background(),
statusToBeUpdated: &velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseFailed,
Message: "failed",
},
expectedPVBPhase: velerov1api.PodVolumeBackupPhaseFailed,
expectedErr: "pod volume backup failed: failed",
},
{
name: "completed pvbs",
ctx: context.Background(),
statusToBeUpdated: &velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseCompleted,
Message: "completed",
},
expectedPVBPhase: velerov1api.PodVolumeBackupPhaseCompleted,
},
}
for _, c := range cases {
newPVB := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb").Result()
scheme := runtime.NewScheme()
velerov1api.AddToScheme(scheme)
client := ctrlfake.NewClientBuilder().WithScheme(scheme).WithObjects(newPVB).Build()
lw := kube.InternalLW{
Client: client,
Namespace: velerov1api.DefaultNamespace,
ObjectList: new(velerov1api.PodVolumeBackupList),
}
informer := cache.NewSharedIndexInformer(&lw, &velerov1api.PodVolumeBackup{}, 0, cache.Indexers{})
ctx := context.Background()
go informer.Run(ctx.Done())
require.True(t, cache.WaitForCacheSync(ctx.Done(), informer.HasSynced))
logger := logrus.New()
logHook := &logHook{}
logger.Hooks.Add(logHook)
backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper.wg.Add(1)
if c.statusToBeUpdated != nil {
pvb := &velerov1api.PodVolumeBackup{}
err := client.Get(context.Background(), ctrlclient.ObjectKey{Namespace: newPVB.Namespace, Name: newPVB.Name}, pvb)
require.Nil(t, err)
pvb.Status = *c.statusToBeUpdated
err = client.Update(context.Background(), pvb)
require.Nil(t, err)
}
pvbs := backuper.WaitAllPodVolumesProcessed(logger)
if c.expectedErr != "" {
assert.Equal(t, c.expectedErr, logHook.entry.Message)
}
if c.expectedPVBPhase != "" {
require.Len(t, pvbs, 1)
assert.Equal(t, c.expectedPVBPhase, pvbs[0].Status.Phase)
}
}
}
func TestPVCBackupSummary(t *testing.T) {
pbs := NewPVCBackupSummary()
pbs.pvcMap["vol-1"] = builder.ForPersistentVolumeClaim("ns-1", "pvc-1").VolumeName("pv-1").Result()

View File

@ -118,7 +118,7 @@ func TestDeleteOldMaintenanceJobs(t *testing.T) {
assert.NoError(t, err)
// We expect the number of jobs to be equal to 'keep'
assert.Equal(t, keep, len(jobList.Items))
assert.Len(t, jobList.Items, keep)
// We expect that the oldest jobs were deleted
// Job3 should not be present in the remaining list