Revert "Migrate backup sync controller from code-generator to kubebuilder (#4423)"

This reverts commit 5aaeb3ebbe.
pull/4457/head
Xun Jiang 2021-12-17 09:40:24 +08:00
parent 69f6c8d0cd
commit 7ab4bfc632
4 changed files with 617 additions and 544 deletions

View File

@ -1 +0,0 @@
Migrate backup sync controller from code-generator to kubebuilder.

View File

@ -589,6 +589,28 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
csiVSLister, csiVSCLister := s.getCSISnapshotListers()
backupSyncControllerRunInfo := func() controllerRunInfo {
backupSyncContoller := controller.NewBackupSyncController(
s.veleroClient.VeleroV1(),
s.mgr.GetClient(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.config.backupSyncPeriod,
s.namespace,
s.csiSnapshotClient,
s.kubeClient,
s.config.defaultBackupLocation,
newPluginManager,
backupStoreGetter,
s.logger,
)
return controllerRunInfo{
controller: backupSyncContoller,
numWorkers: defaultControllerWorkers,
}
}
backupTracker := controller.NewBackupTracker()
backupControllerRunInfo := func() controllerRunInfo {
@ -747,6 +769,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
enabledControllers := map[string]func() controllerRunInfo{
controller.BackupSync: backupSyncControllerRunInfo,
controller.Backup: backupControllerRunInfo,
controller.Schedule: scheduleControllerRunInfo,
controller.GarbageCollection: gcControllerRunInfo,
@ -758,7 +781,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
enabledRuntimeControllers := make(map[string]struct{})
enabledRuntimeControllers[controller.ServerStatusRequest] = struct{}{}
enabledRuntimeControllers[controller.DownloadRequest] = struct{}{}
enabledRuntimeControllers[controller.BackupSync] = struct{}{}
if s.config.restoreOnly {
s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers")
@ -849,28 +871,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}
if _, ok := enabledRuntimeControllers[controller.BackupSync]; ok {
syncPeriod := s.config.backupSyncPeriod
if syncPeriod <= 0 {
syncPeriod = time.Minute
}
r := controller.BackupSyncReconciler{
Client: s.mgr.GetClient(),
PodVolumeBackupClient: s.veleroClient.VeleroV1(),
BackupLister: s.sharedInformerFactory.Velero().V1().Backups().Lister(),
BackupClient: s.veleroClient.VeleroV1(),
Namespace: s.namespace,
DefaultBackupSyncPeriod: syncPeriod,
NewPluginManager: newPluginManager,
BackupStoreGetter: backupStoreGetter,
Logger: s.logger,
}
if err := r.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, " unable to create controller ", "controller ", controller.BackupSync)
}
}
// TODO(2.0): presuming all controllers and resources are converted to runtime-controller
// by v2.0, the block from this line and including the `s.mgr.Start() will be
// deprecated, since the manager auto-starts all the caches. Until then, we need to start the

View File

@ -1,5 +1,5 @@
/*
Copyright The Velero Contributors.
Copyright 2020 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -20,12 +20,14 @@ import (
"context"
"time"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
kuberrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"github.com/vmware-tanzu/velero/internal/storage"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@ -36,49 +38,114 @@ import (
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type BackupSyncReconciler struct {
Client client.Client
PodVolumeBackupClient velerov1client.PodVolumeBackupsGetter
BackupLister velerov1listers.BackupLister
BackupClient velerov1client.BackupsGetter
Namespace string
DefaultBackupSyncPeriod time.Duration
NewPluginManager func(logrus.FieldLogger) clientmgmt.Manager
BackupStoreGetter persistence.ObjectBackupStoreGetter
Logger logrus.FieldLogger
type backupSyncController struct {
*genericController
backupClient velerov1client.BackupsGetter
kbClient client.Client
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter
backupLister velerov1listers.BackupLister
csiSnapshotClient *snapshotterClientSet.Clientset
kubeClient kubernetes.Interface
namespace string
defaultBackupLocation string
defaultBackupSyncPeriod time.Duration
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
}
func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := b.Logger.WithField("controller", BackupSync)
log.Debug("Checking for existing backup storage locations to sync into cluster.")
func NewBackupSyncController(
backupClient velerov1client.BackupsGetter,
kbClient client.Client,
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter,
backupLister velerov1listers.BackupLister,
syncPeriod time.Duration,
namespace string,
csiSnapshotClient *snapshotterClientSet.Clientset,
kubeClient kubernetes.Interface,
defaultBackupLocation string,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
logger logrus.FieldLogger,
) Interface {
if syncPeriod <= 0 {
syncPeriod = time.Minute
}
logger.Infof("Backup sync period is %v", syncPeriod)
locationList, err := storage.ListBackupStorageLocations(ctx, b.Client, b.Namespace)
c := &backupSyncController{
genericController: newGenericController(BackupSync, logger),
backupClient: backupClient,
kbClient: kbClient,
podVolumeBackupClient: podVolumeBackupClient,
namespace: namespace,
defaultBackupLocation: defaultBackupLocation,
defaultBackupSyncPeriod: syncPeriod,
backupLister: backupLister,
csiSnapshotClient: csiSnapshotClient,
kubeClient: kubeClient,
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
newPluginManager: newPluginManager,
backupStoreGetter: backupStoreGetter,
}
c.resyncFunc = c.run
c.resyncPeriod = 30 * time.Second
return c
}
// orderedBackupLocations returns a new slice with the default backup location first (if it exists),
// followed by the rest of the locations in no particular order.
func orderedBackupLocations(locationList *velerov1api.BackupStorageLocationList, defaultLocationName string) []velerov1api.BackupStorageLocation {
var result []velerov1api.BackupStorageLocation
for i := range locationList.Items {
if locationList.Items[i].Name == defaultLocationName {
// put the default location first
result = append(result, locationList.Items[i])
// append everything before the default
result = append(result, locationList.Items[:i]...)
// append everything after the default
result = append(result, locationList.Items[i+1:]...)
return result
}
}
return locationList.Items
}
func (c *backupSyncController) run() {
c.logger.Debug("Checking for existing backup storage locations to sync into cluster")
locationList, err := storage.ListBackupStorageLocations(context.Background(), c.kbClient, c.namespace)
if err != nil {
log.WithError(err).Error("No backup storage locations found, at least one is required")
return ctrl.Result{}, err
c.logger.WithError(err).Error("No backup storage locations found, at least one is required")
return
}
// sync the default backup storage location first, if it exists
defaultBackupLocationName := ""
for _, location := range locationList.Items {
if location.Spec.Default {
defaultBackupLocationName = location.Name
c.defaultBackupLocation = location.Name
break
}
}
locations := orderedBackupLocations(&locationList, defaultBackupLocationName)
locations := orderedBackupLocations(&locationList, c.defaultBackupLocation)
pluginManager := b.NewPluginManager(log)
pluginManager := c.newPluginManager(c.logger)
defer pluginManager.CleanupClients()
for _, location := range locations {
log := log.WithField("backupLocation", location.Name)
log := c.logger.WithField("backupLocation", location.Name)
syncPeriod := b.DefaultBackupSyncPeriod
syncPeriod := c.defaultBackupSyncPeriod
if location.Spec.BackupSyncPeriod != nil {
syncPeriod = location.Spec.BackupSyncPeriod.Duration
if syncPeriod == 0 {
@ -88,7 +155,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if syncPeriod < 0 {
log.Debug("Backup sync period must be non-negative")
syncPeriod = b.DefaultBackupSyncPeriod
syncPeriod = c.defaultBackupSyncPeriod
}
}
@ -103,7 +170,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log.Debug("Checking backup location for backups to sync into cluster")
backupStore, err := b.BackupStoreGetter.Get(&location, pluginManager, log)
backupStore, err := c.backupStoreGetter.Get(&location, pluginManager, log)
if err != nil {
log.WithError(err).Error("Error getting backup store for this location")
continue
@ -119,7 +186,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log.WithField("backupCount", len(backupStoreBackups)).Debug("Got backups from backup store")
// get a list of all the backups that exist as custom resources in the cluster
clusterBackups, err := b.BackupLister.Backups(b.Namespace).List(labels.Everything())
clusterBackups, err := c.backupLister.Backups(c.namespace).List(labels.Everything())
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting backups from cluster, proceeding with sync into cluster")
} else {
@ -150,7 +217,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
continue
}
backup.Namespace = b.Namespace
backup.Namespace = c.namespace
backup.ResourceVersion = ""
// update the StorageLocation field and label since the name of the location
@ -163,7 +230,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation)
// attempt to create backup custom resource via API
backup, err = b.BackupClient.Backups(backup.Namespace).Create(ctx, backup, metav1.CreateOptions{})
backup, err = c.backupClient.Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{})
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debug("Backup already exists in cluster")
@ -200,7 +267,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
podVolumeBackup.Namespace = backup.Namespace
podVolumeBackup.ResourceVersion = ""
_, err = b.PodVolumeBackupClient.PodVolumeBackups(backup.Namespace).Create(ctx, podVolumeBackup, metav1.CreateOptions{})
_, err = c.podVolumeBackupClient.PodVolumeBackups(backup.Namespace).Create(context.TODO(), podVolumeBackup, metav1.CreateOptions{})
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debug("Pod volume backup already exists in cluster")
@ -227,7 +294,7 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
for _, snapCont := range snapConts {
// TODO: Reset ResourceVersion prior to persisting VolumeSnapshotContents
snapCont.ResourceVersion = ""
err := b.Client.Create(ctx, snapCont, &client.CreateOptions{})
created, err := c.csiSnapshotClient.SnapshotV1beta1().VolumeSnapshotContents().Create(context.TODO(), snapCont, metav1.CreateOptions{})
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debugf("volumesnapshotcontent %s already exists in cluster", snapCont.Name)
@ -236,45 +303,36 @@ func (b *BackupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log.WithError(errors.WithStack(err)).Errorf("Error syncing volumesnapshotcontent %s into cluster", snapCont.Name)
continue
default:
log.Infof("Created CSI volumesnapshotcontent %s", snapCont.Name)
log.Infof("Created CSI volumesnapshotcontent %s", created.Name)
}
}
}
}
b.deleteOrphanedBackups(ctx, location.Name, backupStoreBackups, log)
c.deleteOrphanedBackups(location.Name, backupStoreBackups, log)
// update the location's last-synced time field
statusPatch := client.MergeFrom(location.DeepCopy())
location.Status.LastSyncedTime = &metav1.Time{Time: time.Now().UTC()}
if err := b.Client.Status().Patch(ctx, &location, statusPatch); err != nil {
if err := c.kbClient.Status().Patch(context.Background(), &location, statusPatch); err != nil {
log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time")
continue
}
}
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 30}, nil
}
func (b *BackupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.Backup{}).
Complete(b)
}
// deleteOrphanedBackups deletes backup objects (CRDs) from Kubernetes that have the specified location
// and a phase of Completed, but no corresponding backup in object storage.
func (b *BackupSyncReconciler) deleteOrphanedBackups(ctx context.Context, locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) {
func (c *backupSyncController) deleteOrphanedBackups(locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) {
locationSelector := labels.Set(map[string]string{
velerov1api.StorageLocationLabel: label.GetValidName(locationName),
}).AsSelector()
backups, err := b.BackupLister.Backups(b.Namespace).List(locationSelector)
backups, err := c.backupLister.Backups(c.namespace).List(locationSelector)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing backups from cluster")
return
}
if len(backups) == 0 {
return
}
@ -285,31 +343,10 @@ func (b *BackupSyncReconciler) deleteOrphanedBackups(ctx context.Context, locati
continue
}
if err := b.BackupClient.Backups(backup.Namespace).Delete(ctx, backup.Name, metav1.DeleteOptions{}); err != nil {
if err := c.backupClient.Backups(backup.Namespace).Delete(context.TODO(), backup.Name, metav1.DeleteOptions{}); err != nil {
log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster")
} else {
log.Debug("Deleted orphaned backup from cluster")
}
}
}
// orderedBackupLocations returns a new slice with the default backup location first (if it exists),
// followed by the rest of the locations in no particular order.
func orderedBackupLocations(locationList *velerov1api.BackupStorageLocationList, defaultLocationName string) []velerov1api.BackupStorageLocation {
var result []velerov1api.BackupStorageLocation
for i := range locationList.Items {
if locationList.Items[i].Name == defaultLocationName {
// put the default location first
result = append(result, locationList.Items[i])
// append everything before the default
result = append(result, locationList.Items[:i]...)
// append everything after the default
result = append(result, locationList.Items[i+1:]...)
return result
}
}
return locationList.Items
}

File diff suppressed because it is too large Load Diff