refactor pod volume context

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/8664/head
Lyndon-Li 2025-02-05 16:16:44 +08:00
parent 294bbbc69e
commit 5fd9df3e2c
2 changed files with 13 additions and 9 deletions

View File

@ -117,6 +117,7 @@ type kubernetesBackupper struct {
podCommandExecutor podexec.PodCommandExecutor podCommandExecutor podexec.PodCommandExecutor
podVolumeBackupperFactory podvolume.BackupperFactory podVolumeBackupperFactory podvolume.BackupperFactory
podVolumeTimeout time.Duration podVolumeTimeout time.Duration
podVolumeContext context.Context
defaultVolumesToFsBackup bool defaultVolumesToFsBackup bool
clientPageSize int clientPageSize int
uploaderType string uploaderType string
@ -308,12 +309,13 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
} }
} }
ctx, cancelFunc := context.WithTimeout(context.Background(), podVolumeTimeout) var cancelFunc context.CancelFunc
kb.podVolumeContext, cancelFunc = context.WithTimeout(context.Background(), podVolumeTimeout)
defer cancelFunc() defer cancelFunc()
var podVolumeBackupper podvolume.Backupper var podVolumeBackupper podvolume.Backupper
if kb.podVolumeBackupperFactory != nil { if kb.podVolumeBackupperFactory != nil {
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType) podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(kb.podVolumeContext, log, backupRequest.Backup, kb.uploaderType)
if err != nil { if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper") log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err) return errors.WithStack(err)
@ -489,7 +491,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock { if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items)) log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock) backedUpGRs := kb.backupItemBlock(*itemBlock)
for _, backedUpGR := range backedUpGRs { for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true backedUpGroupResources[backedUpGR] = true
} }
@ -661,7 +663,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
} }
} }
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource { func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock // find pods in ItemBlock
// filter pods based on whether they still need to be backed up // filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks // this list will be used to run pre/post hooks
@ -703,7 +705,7 @@ func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock Ba
if len(postHookPods) > 0 { if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks") itemBlock.Log.Debug("Executing post hooks")
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1) itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods) go kb.handleItemBlockPostHooks(itemBlock, postHookPods)
} }
return grList return grList
@ -739,12 +741,12 @@ func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock
} }
// The hooks cannot execute until the PVBs to be processed // The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) { func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()
// the post hooks will not execute until all PVBs of the item block pods are processed // the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil { if err := kb.waitUntilPVBsProcessed(kb.podVolumeContext, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock") log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return return
} }

View File

@ -108,6 +108,7 @@ type kubernetesRestorer struct {
namespaceClient corev1.NamespaceInterface namespaceClient corev1.NamespaceInterface
podVolumeRestorerFactory podvolume.RestorerFactory podVolumeRestorerFactory podvolume.RestorerFactory
podVolumeTimeout time.Duration podVolumeTimeout time.Duration
podVolumeContext go_context.Context
resourceTerminatingTimeout time.Duration resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration resourceTimeout time.Duration
resourcePriorities types.Priorities resourcePriorities types.Priorities
@ -249,12 +250,13 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
} }
} }
ctx, cancelFunc := go_context.WithTimeout(go_context.Background(), podVolumeTimeout) var cancelFunc go_context.CancelFunc
kr.podVolumeContext, cancelFunc = go_context.WithTimeout(go_context.Background(), podVolumeTimeout)
defer cancelFunc() defer cancelFunc()
var podVolumeRestorer podvolume.Restorer var podVolumeRestorer podvolume.Restorer
if kr.podVolumeRestorerFactory != nil { if kr.podVolumeRestorerFactory != nil {
podVolumeRestorer, err = kr.podVolumeRestorerFactory.NewRestorer(ctx, req.Restore) podVolumeRestorer, err = kr.podVolumeRestorerFactory.NewRestorer(kr.podVolumeContext, req.Restore)
if err != nil { if err != nil {
return results.Result{}, results.Result{Velero: []string{err.Error()}} return results.Result{}, results.Result{Velero: []string{err.Error()}}
} }