Merge pull request #4855 from reasonerjt/bak-delete-refact

Refactor backup deletion controller based on kubebuilder
pull/4869/head
Xun Jiang/Bruce Jiang 2022-04-28 10:48:11 +08:00 committed by GitHub
commit 001229a8b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 560 additions and 1101 deletions

View File

@ -0,0 +1 @@
Refactor backup deletion controller based on kubebuilder

View File

@ -16,7 +16,16 @@ spec:
singular: deletebackuprequest
scope: Namespaced
versions:
- name: v1
- additionalPrinterColumns:
- description: The name of the backup to be deleted
jsonPath: .spec.backupName
name: BackupName
type: string
- description: The status of the deletion request
jsonPath: .status.phase
name: Status
type: string
name: v1
schema:
openAPIV3Schema:
description: DeleteBackupRequest is a request to delete one or more backups.
@ -63,6 +72,8 @@ spec:
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""

File diff suppressed because one or more lines are too long

View File

@ -12,6 +12,7 @@ rules:
- backups
verbs:
- create
- delete
- apiGroups:
- velero.io
resources:
@ -32,6 +33,26 @@ rules:
- get
- patch
- update
- apiGroups:
- velero.io
resources:
- deletebackuprequests
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- velero.io
resources:
- deletebackuprequests/status
verbs:
- get
- patch
- update
- apiGroups:
- velero.io
resources:

View File

@ -50,8 +50,15 @@ type DeleteBackupRequestStatus struct {
Errors []string `json:"errors,omitempty"`
}
// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
// +kubebuilder:object:generate=true
// +kubebuilder:storageversion
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="BackupName",type="string",JSONPath=".spec.backupName",description="The name of the backup to be deleted"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="The status of the deletion request"
// DeleteBackupRequest is a request to delete one or more backups.
type DeleteBackupRequest struct {
@ -68,6 +75,7 @@ type DeleteBackupRequest struct {
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
// DeleteBackupRequestList is a list of DeleteBackupRequests.
type DeleteBackupRequestList struct {

View File

@ -674,34 +674,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}
deletionControllerRunInfo := func() controllerRunInfo {
deletionController := controller.NewBackupDeletionController(
s.logger,
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests(),
s.veleroClient.VeleroV1(), // deleteBackupRequestClient
s.veleroClient.VeleroV1(), // backupClient
s.sharedInformerFactory.Velero().V1().Restores().Lister(),
s.veleroClient.VeleroV1(), // restoreClient
backupTracker,
s.resticManager,
s.sharedInformerFactory.Velero().V1().PodVolumeBackups().Lister(),
s.mgr.GetClient(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
csiVSLister,
csiVSCLister,
s.csiSnapshotClient,
newPluginManager,
backupStoreGetter,
s.metrics,
s.discoveryHelper,
)
return controllerRunInfo{
controller: deletionController,
numWorkers: defaultControllerWorkers,
}
}
restoreControllerRunInfo := func() controllerRunInfo {
restorer, err := restore.NewKubernetesRestorer(
s.veleroClient.VeleroV1(),
@ -761,7 +733,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
controller.BackupSync: backupSyncControllerRunInfo,
controller.Backup: backupControllerRunInfo,
controller.GarbageCollection: gcControllerRunInfo,
controller.BackupDeletion: deletionControllerRunInfo,
controller.Restore: restoreControllerRunInfo,
controller.ResticRepo: resticRepoControllerRunInfo,
}
@ -835,6 +806,19 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
}
if err := controller.NewBackupDeletionReconciler(
s.logger,
s.mgr.GetClient(),
backupTracker,
s.resticManager,
s.metrics,
s.discoveryHelper,
newPluginManager,
backupStoreGetter,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupDeletion)
}
if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok {
r := controller.ServerStatusRequestReconciler{
Scheme: s.mgr.GetScheme(),

View File

@ -23,25 +23,19 @@ import (
"time"
jsonpatch "github.com/evanphx/json-patch"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/vmware-tanzu/velero/internal/delete"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
"github.com/vmware-tanzu/velero/pkg/discovery"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
@ -54,250 +48,202 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
const resticTimeout = time.Minute
const (
resticTimeout = time.Minute
deleteBackupRequestMaxAge = 24 * time.Hour
)
type backupDeletionController struct {
*genericController
deleteBackupRequestClient velerov1client.DeleteBackupRequestsGetter
deleteBackupRequestLister velerov1listers.DeleteBackupRequestLister
backupClient velerov1client.BackupsGetter
restoreLister velerov1listers.RestoreLister
restoreClient velerov1client.RestoresGetter
backupTracker BackupTracker
resticMgr restic.RepositoryManager
podvolumeBackupLister velerov1listers.PodVolumeBackupLister
kbClient client.Client
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
csiSnapshotLister snapshotv1listers.VolumeSnapshotLister
csiSnapshotContentLister snapshotv1listers.VolumeSnapshotContentLister
csiSnapshotClient *snapshotterClientSet.Clientset
processRequestFunc func(*velerov1api.DeleteBackupRequest) error
clock clock.Clock
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
metrics *metrics.ServerMetrics
helper discovery.Helper
type backupDeletionReconciler struct {
client.Client
logger logrus.FieldLogger
backupTracker BackupTracker
resticMgr restic.RepositoryManager
metrics *metrics.ServerMetrics
clock clock.Clock
discoveryHelper discovery.Helper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
}
// NewBackupDeletionController creates a new backup deletion controller.
func NewBackupDeletionController(
// NewBackupDeletionReconciler creates a new backup deletion reconciler.
func NewBackupDeletionReconciler(
logger logrus.FieldLogger,
deleteBackupRequestInformer velerov1informers.DeleteBackupRequestInformer,
deleteBackupRequestClient velerov1client.DeleteBackupRequestsGetter,
backupClient velerov1client.BackupsGetter,
restoreLister velerov1listers.RestoreLister,
restoreClient velerov1client.RestoresGetter,
client client.Client,
backupTracker BackupTracker,
resticMgr restic.RepositoryManager,
podvolumeBackupLister velerov1listers.PodVolumeBackupLister,
kbClient client.Client,
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
csiSnapshotLister snapshotv1listers.VolumeSnapshotLister,
csiSnapshotContentLister snapshotv1listers.VolumeSnapshotContentLister,
csiSnapshotClient *snapshotterClientSet.Clientset,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
helper discovery.Helper,
) Interface {
c := &backupDeletionController{
genericController: newGenericController(BackupDeletion, logger),
deleteBackupRequestClient: deleteBackupRequestClient,
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
backupClient: backupClient,
restoreLister: restoreLister,
restoreClient: restoreClient,
backupTracker: backupTracker,
resticMgr: resticMgr,
podvolumeBackupLister: podvolumeBackupLister,
kbClient: kbClient,
snapshotLocationLister: snapshotLocationLister,
csiSnapshotLister: csiSnapshotLister,
csiSnapshotContentLister: csiSnapshotContentLister,
csiSnapshotClient: csiSnapshotClient,
metrics: metrics,
helper: helper,
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
) *backupDeletionReconciler {
return &backupDeletionReconciler{
Client: client,
logger: logger,
backupTracker: backupTracker,
resticMgr: resticMgr,
metrics: metrics,
clock: clock.RealClock{},
discoveryHelper: helper,
newPluginManager: newPluginManager,
backupStoreGetter: backupStoreGetter,
clock: &clock.RealClock{},
}
c.syncHandler = c.processQueueItem
c.processRequestFunc = c.processRequest
deleteBackupRequestInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
},
)
c.resyncPeriod = time.Hour
c.resyncFunc = c.deleteExpiredRequests
return c
}
func (c *backupDeletionController) processQueueItem(key string) error {
log := c.logger.WithField("key", key)
log.Debug("Running processItem")
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return errors.Wrap(err, "error splitting queue key")
}
req, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).Get(name)
if apierrors.IsNotFound(err) {
log.Debug("Unable to find DeleteBackupRequest")
return nil
}
if err != nil {
return errors.Wrap(err, "error getting DeleteBackupRequest")
}
switch req.Status.Phase {
case velerov1api.DeleteBackupRequestPhaseProcessed:
// Don't do anything because it's already been processed
default:
// Don't mutate the shared cache
reqCopy := req.DeepCopy()
return c.processRequestFunc(reqCopy)
}
return nil
func (r *backupDeletionReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Make sure the expired requests can be deleted eventually
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour)
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.DeleteBackupRequest{}).
Watches(s, nil).
Complete(r)
}
func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupRequest) error {
log := c.logger.WithFields(logrus.Fields{
"namespace": req.Namespace,
"name": req.Name,
"backup": req.Spec.BackupName,
// +kubebuilder:rbac:groups=velero.io,resources=deletebackuprequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=deletebackuprequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=delete
func (r *backupDeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.logger.WithFields(logrus.Fields{
"controller": BackupDeletion,
"deletebackuprequest": req.String(),
})
log.Debug("Getting deletebackuprequest")
dbr := &velerov1api.DeleteBackupRequest{}
if err := r.Get(ctx, req.NamespacedName, dbr); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find the deletebackuprequest")
return ctrl.Result{}, nil
}
log.WithError(err).Error("Error getting deletebackuprequest")
return ctrl.Result{}, err
}
var err error
// Since we use the reconciler along with the PeriodicalEnqueueSource, there may be reconciliation triggered by
// stale requests.
if dbr.Status.Phase == velerov1api.DeleteBackupRequestPhaseProcessed {
age := r.clock.Now().Sub(dbr.CreationTimestamp.Time)
if age >= deleteBackupRequestMaxAge { // delete the expired request
log.Debug("The request is expired, deleting it.")
if err := r.Delete(ctx, dbr); err != nil {
log.WithError(err).Error("Error deleting DeleteBackupRequest")
}
} else {
log.Info("The request has been processed, skip.")
}
return ctrl.Result{}, nil
}
// Make sure we have the backup name
if req.Spec.BackupName == "" {
_, err = c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
r.Status.Errors = []string{"spec.backupName is required"}
if dbr.Spec.BackupName == "" {
_, err := r.patchDeleteBackupRequest(ctx, dbr, func(res *velerov1api.DeleteBackupRequest) {
res.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
res.Status.Errors = []string{"spec.backupName is required"}
})
return err
return ctrl.Result{}, err
}
log = log.WithField("backup", dbr.Spec.BackupName)
// Remove any existing deletion requests for this backup so we only have
// one at a time
if errs := c.deleteExistingDeletionRequests(req, log); errs != nil {
return kubeerrs.NewAggregate(errs)
if errs := r.deleteExistingDeletionRequests(ctx, dbr, log); errs != nil {
return ctrl.Result{}, kubeerrs.NewAggregate(errs)
}
// Don't allow deleting an in-progress backup
if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) {
_, err = c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
if r.backupTracker.Contains(dbr.Namespace, dbr.Spec.BackupName) {
_, err := r.patchDeleteBackupRequest(ctx, dbr, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
r.Status.Errors = []string{"backup is still in progress"}
})
return err
return ctrl.Result{}, err
}
// Get the backup we're trying to delete
backup, err := c.backupClient.Backups(req.Namespace).Get(context.TODO(), req.Spec.BackupName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
backup := &velerov1api.Backup{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: dbr.Namespace,
Name: dbr.Spec.BackupName,
}, backup); apierrors.IsNotFound(err) {
// Couldn't find backup - update status to Processed and record the not-found error
req, err = c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
_, err = r.patchDeleteBackupRequest(ctx, dbr, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
r.Status.Errors = []string{"backup not found"}
})
return err
}
if err != nil {
return errors.Wrap(err, "error getting backup")
return ctrl.Result{}, err
} else if err != nil {
return ctrl.Result{}, errors.Wrap(err, "error getting backup")
}
// Don't allow deleting backups in read-only storage locations
location := &velerov1api.BackupStorageLocation{}
if err := c.kbClient.Get(context.Background(), client.ObjectKey{
if err := r.Get(context.Background(), client.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Spec.StorageLocation,
}, location); err != nil {
if apierrors.IsNotFound(err) {
_, err := c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
_, err := r.patchDeleteBackupRequest(ctx, dbr, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
r.Status.Errors = append(r.Status.Errors, fmt.Sprintf("backup storage location %s not found", backup.Spec.StorageLocation))
})
return err
return ctrl.Result{}, err
}
return errors.Wrap(err, "error getting backup storage location")
return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location")
}
if location.Spec.AccessMode == velerov1api.BackupStorageLocationAccessModeReadOnly {
_, err := c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
_, err := r.patchDeleteBackupRequest(ctx, dbr, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
r.Status.Errors = append(r.Status.Errors, fmt.Sprintf("cannot delete backup because backup storage location %s is currently in read-only mode", location.Name))
})
return err
return ctrl.Result{}, err
}
// if the request object has no labels defined, initialise an empty map since
// we will be updating labels
if req.Labels == nil {
req.Labels = map[string]string{}
if dbr.Labels == nil {
dbr.Labels = map[string]string{}
}
// Update status to InProgress and set backup-name label if needed
req, err = c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
// Update status to InProgress and set backup-name and backup-uid label if needed
dbr, err := r.patchDeleteBackupRequest(ctx, dbr, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseInProgress
if req.Labels[velerov1api.BackupNameLabel] == "" {
req.Labels[velerov1api.BackupNameLabel] = label.GetValidName(req.Spec.BackupName)
if r.Labels[velerov1api.BackupNameLabel] == "" {
r.Labels[velerov1api.BackupNameLabel] = label.GetValidName(dbr.Spec.BackupName)
}
if r.Labels[velerov1api.BackupUIDLabel] == "" {
r.Labels[velerov1api.BackupUIDLabel] = string(backup.UID)
}
})
if err != nil {
return err
}
// Set backup-uid label if needed
if req.Labels[velerov1api.BackupUIDLabel] == "" {
req, err = c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
req.Labels[velerov1api.BackupUIDLabel] = string(backup.UID)
})
if err != nil {
return err
}
return ctrl.Result{}, err
}
// Set backup status to Deleting
backup, err = c.patchBackup(backup, func(b *velerov1api.Backup) {
backup, err = r.patchBackup(ctx, backup, func(b *velerov1api.Backup) {
b.Status.Phase = velerov1api.BackupPhaseDeleting
})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error setting backup phase to deleting")
return err
return ctrl.Result{}, err
}
backupScheduleName := backup.GetLabels()[velerov1api.ScheduleNameLabel]
c.metrics.RegisterBackupDeletionAttempt(backupScheduleName)
r.metrics.RegisterBackupDeletionAttempt(backupScheduleName)
var errs []string
pluginManager := c.newPluginManager(log)
pluginManager := r.newPluginManager(log)
defer pluginManager.CleanupClients()
backupStore, err := c.backupStoreGetter.Get(location, pluginManager, log)
backupStore, err := r.backupStoreGetter.Get(location, pluginManager, log)
if err != nil {
return errors.Wrap(err, "error getting the backup store")
return ctrl.Result{}, errors.Wrap(err, "error getting the backup store")
}
actions, err := pluginManager.GetDeleteItemActions()
log.Debugf("%d actions before invoking actions", len(actions))
if err != nil {
return errors.Wrap(err, "error getting delete item actions")
return ctrl.Result{}, errors.Wrap(err, "error getting delete item actions")
}
// don't defer CleanupClients here, since it was already called above.
@ -308,13 +254,13 @@ func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupR
if err != nil {
log.WithError(err).Errorf("Unable to download tarball for backup %s, skipping associated DeleteItemAction plugins", backup.Name)
} else {
defer closeAndRemoveFile(backupFile, c.logger)
defer closeAndRemoveFile(backupFile, r.logger)
ctx := &delete.Context{
Backup: backup,
BackupReader: backupFile,
Actions: actions,
Log: c.logger,
DiscoveryHelper: c.helper,
Log: r.logger,
DiscoveryHelper: r.discoveryHelper,
Filesystem: filesystem.NewFileSystem(),
}
@ -322,11 +268,13 @@ func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupR
// but what do we do with the error returned? We can't just swallow it as that may lead to dangling resources.
err = delete.InvokeDeleteActions(ctx)
if err != nil {
return errors.Wrap(err, "error invoking delete item actions")
return ctrl.Result{}, errors.Wrap(err, "error invoking delete item actions")
}
}
}
var errs []string
if backupStore != nil {
log.Info("Removing PV snapshots")
@ -340,7 +288,7 @@ func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupR
volumeSnapshotter, ok := volumeSnapshotters[snapshot.Spec.Location]
if !ok {
if volumeSnapshotter, err = volumeSnapshotterForSnapshotLocation(backup.Namespace, snapshot.Spec.Location, c.snapshotLocationLister, pluginManager); err != nil {
if volumeSnapshotter, err = volumeSnapshottersForVSL(ctx, backup.Namespace, snapshot.Spec.Location, r.Client, pluginManager); err != nil {
errs = append(errs, err.Error())
continue
}
@ -353,9 +301,8 @@ func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupR
}
}
}
log.Info("Removing restic snapshots")
if deleteErrs := c.deleteResticSnapshots(backup); len(deleteErrs) > 0 {
if deleteErrs := r.deleteResticSnapshots(ctx, backup); len(deleteErrs) > 0 {
for _, err := range deleteErrs {
errs = append(errs, err.Error())
}
@ -369,15 +316,19 @@ func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupR
}
log.Info("Removing restores")
if restores, err := c.restoreLister.Restores(backup.Namespace).List(labels.Everything()); err != nil {
restoreList := &velerov1api.RestoreList{}
selector := labels.Everything()
if err := r.List(ctx, restoreList, &client.ListOptions{
Namespace: backup.Namespace,
LabelSelector: selector,
}); err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing restore API objects")
} else {
for _, restore := range restores {
for _, restore := range restoreList.Items {
if restore.Spec.BackupName != backup.Name {
continue
}
restoreLog := log.WithField("restore", kube.NamespaceAndName(restore))
restoreLog := log.WithField("restore", kube.NamespaceAndName(&restore))
restoreLog.Info("Deleting restore log/results from backup storage")
if err := backupStore.DeleteRestore(restore.Name); err != nil {
@ -387,202 +338,160 @@ func (c *backupDeletionController) processRequest(req *velerov1api.DeleteBackupR
}
restoreLog.Info("Deleting restore referencing backup")
if err := c.restoreClient.Restores(restore.Namespace).Delete(context.TODO(), restore.Name, metav1.DeleteOptions{}); err != nil {
errs = append(errs, errors.Wrapf(err, "error deleting restore %s", kube.NamespaceAndName(restore)).Error())
if err := r.Delete(ctx, &restore); err != nil {
errs = append(errs, errors.Wrapf(err, "error deleting restore %s", kube.NamespaceAndName(&restore)).Error())
}
}
}
if len(errs) == 0 {
// Only try to delete the backup object from kube if everything preceding went smoothly
err = c.backupClient.Backups(backup.Namespace).Delete(context.TODO(), backup.Name, metav1.DeleteOptions{})
if err != nil {
if err := r.Delete(ctx, backup); err != nil {
errs = append(errs, errors.Wrapf(err, "error deleting backup %s", kube.NamespaceAndName(backup)).Error())
}
}
if len(errs) == 0 {
c.metrics.RegisterBackupDeletionSuccess(backupScheduleName)
r.metrics.RegisterBackupDeletionSuccess(backupScheduleName)
} else {
c.metrics.RegisterBackupDeletionFailed(backupScheduleName)
r.metrics.RegisterBackupDeletionFailed(backupScheduleName)
}
// Update status to processed and record errors
req, err = c.patchDeleteBackupRequest(req, func(r *velerov1api.DeleteBackupRequest) {
if _, err := r.patchDeleteBackupRequest(ctx, dbr, func(r *velerov1api.DeleteBackupRequest) {
r.Status.Phase = velerov1api.DeleteBackupRequestPhaseProcessed
r.Status.Errors = errs
})
if err != nil {
return err
}); err != nil {
return ctrl.Result{}, err
}
// Everything deleted correctly, so we can delete all DeleteBackupRequests for this backup
if len(errs) == 0 {
listOptions := pkgbackup.NewDeleteBackupRequestListOptions(backup.Name, string(backup.UID))
err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, listOptions)
labelSelector, err := labels.Parse(fmt.Sprintf("%s=%s,%s=%s", velerov1api.BackupNameLabel, label.GetValidName(backup.Name), velerov1api.BackupUIDLabel, backup.UID))
if err != nil {
// Should not be here
r.logger.WithError(err).WithField("backup", kube.NamespaceAndName(backup)).Error("error creating label selector for the backup for deleting DeleteBackupRequests")
return ctrl.Result{}, nil
}
alldbr := &velerov1api.DeleteBackupRequest{}
err = r.DeleteAllOf(ctx, alldbr, client.MatchingLabelsSelector{
Selector: labelSelector,
}, client.InNamespace(dbr.Namespace))
if err != nil {
// If this errors, all we can do is log it.
c.logger.WithField("backup", kube.NamespaceAndName(backup)).Error("error deleting all associated DeleteBackupRequests after successfully deleting the backup")
r.logger.WithError(err).WithField("backup", kube.NamespaceAndName(backup)).Error("error deleting all associated DeleteBackupRequests after successfully deleting the backup")
}
}
log.Infof("Reconciliation done")
return nil
return ctrl.Result{}, nil
}
func volumeSnapshotterForSnapshotLocation(
namespace, snapshotLocationName string,
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
func volumeSnapshottersForVSL(
ctx context.Context,
namespace, vslName string,
client client.Client,
pluginManager clientmgmt.Manager,
) (velero.VolumeSnapshotter, error) {
snapshotLocation, err := snapshotLocationLister.VolumeSnapshotLocations(namespace).Get(snapshotLocationName)
vsl := &velerov1api.VolumeSnapshotLocation{}
if err := client.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: vslName,
}, vsl); err != nil {
return nil, errors.Wrapf(err, "error getting volume snapshot location %s", vslName)
}
volumeSnapshotter, err := pluginManager.GetVolumeSnapshotter(vsl.Spec.Provider)
if err != nil {
return nil, errors.Wrapf(err, "error getting volume snapshot location %s", snapshotLocationName)
return nil, errors.Wrapf(err, "error getting volume snapshotter for provider %s", vsl.Spec.Provider)
}
volumeSnapshotter, err := pluginManager.GetVolumeSnapshotter(snapshotLocation.Spec.Provider)
if err != nil {
return nil, errors.Wrapf(err, "error getting volume snapshotter for provider %s", snapshotLocation.Spec.Provider)
}
if err = volumeSnapshotter.Init(snapshotLocation.Spec.Config); err != nil {
return nil, errors.Wrapf(err, "error initializing volume snapshotter for volume snapshot location %s", snapshotLocationName)
if err = volumeSnapshotter.Init(vsl.Spec.Config); err != nil {
return nil, errors.Wrapf(err, "error initializing volume snapshotter for volume snapshot location %s", vslName)
}
return volumeSnapshotter, nil
}
func (c *backupDeletionController) deleteExistingDeletionRequests(req *velerov1api.DeleteBackupRequest, log logrus.FieldLogger) []error {
func (r *backupDeletionReconciler) deleteExistingDeletionRequests(ctx context.Context, req *velerov1api.DeleteBackupRequest, log logrus.FieldLogger) []error {
log.Info("Removing existing deletion requests for backup")
dbrList := &velerov1api.DeleteBackupRequestList{}
selector := label.NewSelectorForBackup(req.Spec.BackupName)
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(req.Namespace).List(selector)
if err != nil {
if err := r.List(ctx, dbrList, &client.ListOptions{
Namespace: req.Namespace,
LabelSelector: selector,
}); err != nil {
return []error{errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")}
}
var errs []error
for _, dbr := range dbrs {
for _, dbr := range dbrList.Items {
if dbr.Name == req.Name {
continue
}
if err := c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(context.TODO(), dbr.Name, metav1.DeleteOptions{}); err != nil {
if err := r.Delete(ctx, &dbr); err != nil {
errs = append(errs, errors.WithStack(err))
} else {
log.Infof("deletion request '%s' removed.", dbr.Name)
}
}
return errs
}
func (c *backupDeletionController) deleteResticSnapshots(backup *velerov1api.Backup) []error {
if c.resticMgr == nil {
func (r *backupDeletionReconciler) deleteResticSnapshots(ctx context.Context, backup *velerov1api.Backup) []error {
if r.resticMgr == nil {
return nil
}
snapshots, err := restic.GetSnapshotsInBackup(backup, c.podvolumeBackupLister)
snapshots, err := restic.GetSnapshotsInBackup(ctx, backup, r.Client)
if err != nil {
return []error{err}
}
ctx, cancelFunc := context.WithTimeout(context.Background(), resticTimeout)
ctx2, cancelFunc := context.WithTimeout(ctx, resticTimeout)
defer cancelFunc()
var errs []error
for _, snapshot := range snapshots {
if err := c.resticMgr.Forget(ctx, snapshot); err != nil {
if err := r.resticMgr.Forget(ctx2, snapshot); err != nil {
errs = append(errs, err)
}
}
return errs
}
const deleteBackupRequestMaxAge = 24 * time.Hour
func (c *backupDeletionController) deleteExpiredRequests() {
c.logger.Info("Checking for expired DeleteBackupRequests")
defer c.logger.Info("Done checking for expired DeleteBackupRequests")
// Our shared informer factory filters on a single namespace, so asking for all is ok here.
requests, err := c.deleteBackupRequestLister.List(labels.Everything())
func (r *backupDeletionReconciler) patchDeleteBackupRequest(ctx context.Context, req *velerov1api.DeleteBackupRequest, mutate func(*velerov1api.DeleteBackupRequest)) (*velerov1api.DeleteBackupRequest, error) {
patchHelper, err := patch.NewHelper(req, r.Client)
if err != nil {
c.logger.WithError(err).Error("unable to check for expired DeleteBackupRequests")
return
return nil, errors.Wrap(err, "unable to get the patch helper")
}
now := c.clock.Now()
for _, req := range requests {
if req.Status.Phase != velerov1api.DeleteBackupRequestPhaseProcessed {
continue
}
age := now.Sub(req.CreationTimestamp.Time)
if age >= deleteBackupRequestMaxAge {
reqLog := c.logger.WithFields(logrus.Fields{"namespace": req.Namespace, "name": req.Name})
reqLog.Info("Deleting expired DeleteBackupRequest")
err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(context.TODO(), req.Name, metav1.DeleteOptions{})
if err != nil {
reqLog.WithError(err).Error("Error deleting DeleteBackupRequest")
}
}
}
}
func (c *backupDeletionController) patchDeleteBackupRequest(req *velerov1api.DeleteBackupRequest, mutate func(*velerov1api.DeleteBackupRequest)) (*velerov1api.DeleteBackupRequest, error) {
// Record original json
oldData, err := json.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original DeleteBackupRequest")
}
// Mutate
mutate(req)
// Record new json
newData, err := json.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated DeleteBackupRequest")
if err := patchHelper.Patch(ctx, req); err != nil {
return nil, errors.Wrap(err, "error patching the deletebackuprquest")
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for DeleteBackupRequest")
}
req, err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Patch(context.TODO(), req.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching DeleteBackupRequest")
}
return req, nil
}
func (c *backupDeletionController) patchBackup(backup *velerov1api.Backup, mutate func(*velerov1api.Backup)) (*velerov1api.Backup, error) {
func (r *backupDeletionReconciler) patchBackup(ctx context.Context, backup *velerov1api.Backup, mutate func(*velerov1api.Backup)) (*velerov1api.Backup, error) {
//TODO: The patchHelper can't be used here because the `backup/xxx/status` does not exist, until the bakcup resource is refactored
// Record original json
oldData, err := json.Marshal(backup)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original Backup")
}
// Mutate
mutate(backup)
// Record new json
newData, err := json.Marshal(backup)
newBackup := backup.DeepCopy()
mutate(newBackup)
newData, err := json.Marshal(newBackup)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated Backup")
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for Backup")
}
backup, err = c.backupClient.Backups(backup.Namespace).Patch(context.TODO(), backup.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
if err := r.Client.Patch(ctx, backup, client.RawPatch(types.MergePatchType, patchBytes)); err != nil {
return nil, errors.Wrap(err, "error patching Backup")
}
return backup, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ limitations under the License.
package restic
import (
"context"
"fmt"
"os"
"strings"
@ -26,10 +27,10 @@ import (
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
@ -242,18 +243,21 @@ type SnapshotIdentifier struct {
// GetSnapshotsInBackup returns a list of all restic snapshot ids associated with
// a given Velero backup.
func GetSnapshotsInBackup(backup *velerov1api.Backup, podVolumeBackupLister velerov1listers.PodVolumeBackupLister) ([]SnapshotIdentifier, error) {
selector := labels.Set(map[string]string{
velerov1api.BackupNameLabel: label.GetValidName(backup.Name),
}).AsSelector()
func GetSnapshotsInBackup(ctx context.Context, backup *velerov1api.Backup, kbClient client.Client) ([]SnapshotIdentifier, error) {
podVolumeBackups := &velerov1api.PodVolumeBackupList{}
options := &client.ListOptions{
LabelSelector: labels.Set(map[string]string{
velerov1api.BackupNameLabel: label.GetValidName(backup.Name),
}).AsSelector(),
}
podVolumeBackups, err := podVolumeBackupLister.List(selector)
err := kbClient.List(ctx, podVolumeBackups, options)
if err != nil {
return nil, errors.WithStack(err)
}
var res []SnapshotIdentifier
for _, item := range podVolumeBackups {
for _, item := range podVolumeBackups.Items {
if item.Status.SnapshotID == "" {
continue
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package restic
import (
"context"
"os"
"sort"
"testing"
@ -28,8 +29,6 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
@ -369,10 +368,8 @@ func TestGetSnapshotsInBackup(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
pvbInformer = sharedInformers.Velero().V1().PodVolumeBackups()
veleroBackup = &velerov1api.Backup{}
clientBuilder = velerotest.NewFakeControllerRuntimeClientBuilder(t)
veleroBackup = &velerov1api.Backup{}
)
veleroBackup.Name = "backup-1"
@ -380,12 +377,11 @@ func TestGetSnapshotsInBackup(t *testing.T) {
if test.longBackupNameEnabled {
veleroBackup.Name = "the-really-long-backup-name-that-is-much-more-than-63-characters"
}
clientBuilder.WithLists(&velerov1api.PodVolumeBackupList{
Items: test.podVolumeBackups,
})
for _, pvb := range test.podVolumeBackups {
require.NoError(t, pvbInformer.Informer().GetStore().Add(pvb.DeepCopy()))
}
res, err := GetSnapshotsInBackup(veleroBackup, pvbInformer.Lister())
res, err := GetSnapshotsInBackup(context.TODO(), veleroBackup, clientBuilder.Build())
assert.NoError(t, err)
// sort to ensure good compare of slices

View File

@ -28,6 +28,15 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
func NewFakeControllerRuntimeClientBuilder(t *testing.T) *k8sfake.ClientBuilder {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
require.NoError(t, err)
err = corev1api.AddToScheme(scheme)
require.NoError(t, err)
return k8sfake.NewClientBuilder().WithScheme(scheme)
}
func NewFakeControllerRuntimeClient(t *testing.T, initObjs ...runtime.Object) client.Client {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)