diff --git a/.gitignore b/.gitignore index 92c0a6f6c..e98574df7 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,5 @@ debug *.diff _site/ + +.vs diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 59e9e0f9a..91b673272 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -608,6 +608,7 @@ func (s *server) runControllers(config *api.Config) error { config.BackupStorageProvider.Bucket, config.BackupSyncPeriod.Duration, s.namespace, + s.sharedInformerFactory.Ark().V1().Backups(), s.logger, ) wg.Add(1) diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index d81ead537..a97ff1c2a 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -25,21 +25,28 @@ import ( kuberrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" + listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" "github.com/heptio/ark/pkg/util/kube" "github.com/heptio/ark/pkg/util/stringslice" ) type backupSyncController struct { - client arkv1client.BackupsGetter - backupService cloudprovider.BackupService - bucket string - syncPeriod time.Duration - namespace string - logger logrus.FieldLogger + client arkv1client.BackupsGetter + backupService cloudprovider.BackupService + bucket string + syncPeriod time.Duration + namespace string + backupLister listers.BackupLister + backupInformerSynced cache.InformerSynced + logger logrus.FieldLogger } func NewBackupSyncController( @@ -48,6 +55,7 @@ func NewBackupSyncController( bucket string, syncPeriod time.Duration, namespace string, + backupInformer informers.BackupInformer, logger logrus.FieldLogger, ) Interface { if syncPeriod < time.Minute { @@ -55,12 +63,14 @@ func NewBackupSyncController( syncPeriod = time.Minute } return &backupSyncController{ - client: client, - backupService: backupService, - bucket: bucket, - syncPeriod: syncPeriod, - namespace: namespace, - logger: logger, + client: client, + backupService: backupService, + bucket: bucket, + syncPeriod: syncPeriod, + namespace: namespace, + backupLister: backupInformer.Lister(), + backupInformerSynced: backupInformer.Informer().HasSynced, + logger: logger, } } @@ -69,6 +79,11 @@ func NewBackupSyncController( // receives on the ctx.Done() channel. func (c *backupSyncController) Run(ctx context.Context, workers int) error { c.logger.Info("Running backup sync controller") + c.logger.Info("Waiting for caches to sync") + if !cache.WaitForCacheSync(ctx.Done(), c.backupInformerSynced) { + return errors.New("timed out waiting for caches to sync") + } + c.logger.Info("Caches are synced") wait.Until(c.run, c.syncPeriod, ctx.Done()) return nil } @@ -84,10 +99,13 @@ func (c *backupSyncController) run() { } c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage") + cloudBackupNames := sets.NewString() for _, cloudBackup := range backups { logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup)) logContext.Info("Syncing backup") + cloudBackupNames.Insert(cloudBackup.Name) + // If we're syncing backups made by pre-0.8.0 versions, the server removes all finalizers // faster than the sync finishes. Just process them as we find them. cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer) @@ -107,4 +125,32 @@ func (c *backupSyncController) run() { } } } + + c.deleteUnused(cloudBackupNames) + return +} + +// deleteUnused deletes backup objects from Kubernetes if there is no corresponding backup in the object storage. +func (c *backupSyncController) deleteUnused(cloudBackupNames sets.String) { + // Backups objects in Kubernetes + backups, err := c.backupLister.Backups(c.namespace).List(labels.Everything()) + if err != nil { + c.logger.WithError(errors.WithStack(err)).Error("Error listing backup from Kubernetes") + } + if len(backups) == 0 { + return + } + + // For each backup object in Kubernetes, verify if has a corresponding backup in the object storage. If not, delete it. + for _, backup := range backups { + if !cloudBackupNames.Has(backup.Name) { + if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil { + c.logger.WithError(errors.WithStack(err)).Error("Error deleting unused backup from Kubernetes") + } else { + c.logger.Debugf("Deleted backup: %s", backup.Name) + } + } + } + + return } diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index e155b0532..de2623d83 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -21,12 +21,14 @@ import ( "time" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" core "k8s.io/client-go/testing" "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions" "github.com/heptio/ark/pkg/util/stringslice" arktest "github.com/heptio/ark/pkg/util/test" "github.com/pkg/errors" @@ -94,9 +96,10 @@ func TestBackupSyncControllerRun(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - bs = &arktest.BackupService{} - client = fake.NewSimpleClientset() - logger = arktest.NewLogger() + bs = &arktest.BackupService{} + client = fake.NewSimpleClientset() + sharedInformers = informers.NewSharedInformerFactory(client, 0) + logger = arktest.NewLogger() ) c := NewBackupSyncController( @@ -105,6 +108,7 @@ func TestBackupSyncControllerRun(t *testing.T) { "bucket", time.Duration(0), test.namespace, + sharedInformers.Ark().V1().Backups(), logger, ).(*backupSyncController) @@ -154,3 +158,138 @@ func TestBackupSyncControllerRun(t *testing.T) { }) } } + +func TestDeleteUnused(t *testing.T) { + tests := []struct { + name string + cloudBackups []*v1.Backup + k8sBackups []*arktest.TestBackup + namespace string + expectedDeletes sets.String + }{ + { + name: "no overlapping backups", + namespace: "ns-1", + cloudBackups: []*v1.Backup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, + }, + k8sBackups: []*arktest.TestBackup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupB").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted), + }, + expectedDeletes: sets.NewString("backupA", "backupB", "backupC"), + }, + { + name: "some overlapping backups", + namespace: "ns-1", + cloudBackups: []*v1.Backup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, + }, + k8sBackups: []*arktest.TestBackup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted), + }, + expectedDeletes: sets.NewString("backupC"), + }, + { + name: "all overlapping backups", + namespace: "ns-1", + cloudBackups: []*v1.Backup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, + }, + k8sBackups: []*arktest.TestBackup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithPhase(v1.BackupPhaseCompleted), + }, + expectedDeletes: sets.NewString(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var ( + bs = &arktest.BackupService{} + client = fake.NewSimpleClientset() + sharedInformers = informers.NewSharedInformerFactory(client, 0) + logger = arktest.NewLogger() + ) + + c := NewBackupSyncController( + client.ArkV1(), + bs, + "bucket", + time.Duration(0), + test.namespace, + sharedInformers.Ark().V1().Backups(), + logger, + ).(*backupSyncController) + + expectedDeleteActions := make([]core.Action, 0) + + // setup: insert backups into Kubernetes + for _, backup := range test.k8sBackups { + if test.expectedDeletes.Has(backup.Name) { + actionDelete := core.NewDeleteAction( + v1.SchemeGroupVersion.WithResource("backups"), + test.namespace, + backup.Name, + ) + expectedDeleteActions = append(expectedDeleteActions, actionDelete) + } + + // add test backup to informer: + err := sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup.Backup) + assert.NoError(t, err, "Error adding backup to informer") + + // add test backup to kubernetes: + _, err = client.Ark().Backups(test.namespace).Create(backup.Backup) + assert.NoError(t, err, "Error deleting from clientset") + } + + // get names of client backups + testBackupNames := sets.NewString() + for _, cloudBackup := range test.cloudBackups { + testBackupNames.Insert(cloudBackup.Name) + } + + c.deleteUnused(testBackupNames) + + numBackups, err := numBackups(t, client, c.namespace) + assert.NoError(t, err) + + expected := len(test.k8sBackups) - len(test.expectedDeletes) + assert.Equal(t, expected, numBackups) + + arktest.CompareActions(t, expectedDeleteActions, getDeleteActions(client.Actions())) + }) + } +} + +func getDeleteActions(actions []core.Action) []core.Action { + var deleteActions []core.Action + for _, action := range actions { + if action.GetVerb() == "delete" { + deleteActions = append(deleteActions, action) + } + } + return deleteActions +} + +func numBackups(t *testing.T, c *fake.Clientset, ns string) (int, error) { + t.Helper() + existingK8SBackups, err := c.ArkV1().Backups(ns).List(metav1.ListOptions{}) + if err != nil { + return 0, err + } + + return len(existingK8SBackups.Items), nil +}