From 85a61b8e8de8c484f09beb39bc961a356070ae66 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Thu, 19 Jul 2018 13:31:30 -0700 Subject: [PATCH 1/2] return nil error if 404 encountered when deleting snapshots Signed-off-by: Steve Kriss --- pkg/cloudprovider/aws/block_store.go | 13 ++++++++++++- pkg/cloudprovider/azure/block_store.go | 12 +++++++++++- pkg/cloudprovider/gcp/block_store.go | 13 ++++++++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/cloudprovider/aws/block_store.go b/pkg/cloudprovider/aws/block_store.go index 69b08b387..8960e7c9b 100644 --- a/pkg/cloudprovider/aws/block_store.go +++ b/pkg/cloudprovider/aws/block_store.go @@ -20,6 +20,7 @@ import ( "regexp" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/pkg/errors" @@ -219,7 +220,17 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error { _, err := b.ec2.DeleteSnapshot(req) - return errors.WithStack(err) + // if it's a NotFound error, we don't need to return an error + // since the snapshot is not there. + // see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidSnapshot.NotFound" { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + return nil } var ebsVolumeIDRegex = regexp.MustCompile("vol-.*") diff --git a/pkg/cloudprovider/azure/block_store.go b/pkg/cloudprovider/azure/block_store.go index 0745066be..ca42f80a4 100644 --- a/pkg/cloudprovider/azure/block_store.go +++ b/pkg/cloudprovider/azure/block_store.go @@ -19,6 +19,7 @@ package azure import ( "context" "fmt" + "net/http" "os" "regexp" "strings" @@ -280,7 +281,16 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error { err = <-errChan - return errors.WithStack(err) + // if it's a 404 (not found) error, we don't need to return an error + // since the snapshot is not there. + if azureErr, ok := err.(autorest.DetailedError); ok && azureErr.StatusCode == http.StatusNotFound { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + return nil } func getComputeResourceName(subscription, resourceGroup, resource, name string) string { diff --git a/pkg/cloudprovider/gcp/block_store.go b/pkg/cloudprovider/gcp/block_store.go index 47e3900f6..0fe257131 100644 --- a/pkg/cloudprovider/gcp/block_store.go +++ b/pkg/cloudprovider/gcp/block_store.go @@ -19,6 +19,7 @@ package gcp import ( "encoding/json" "io/ioutil" + "net/http" "os" "github.com/pkg/errors" @@ -27,6 +28,7 @@ import ( "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" "k8s.io/apimachinery/pkg/runtime" @@ -212,7 +214,16 @@ func getSnapshotTags(arkTags map[string]string, diskDescription string, log logr func (b *blockStore) DeleteSnapshot(snapshotID string) error { _, err := b.gce.Snapshots.Delete(b.project, snapshotID).Do() - return errors.WithStack(err) + // if it's a 404 (not found) error, we don't need to return an error + // since the snapshot is not there. + if gcpErr, ok := err.(*googleapi.Error); ok && gcpErr.Code == http.StatusNotFound { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + return nil } func (b *blockStore) GetVolumeID(pv runtime.Unstructured) (string, error) { From 78cbdf95f3b5198acc5f534e3234d2c4e6fff324 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Thu, 19 Jul 2018 16:06:12 -0700 Subject: [PATCH 2/2] delete old deletion requests for backup when processing a new one Signed-off-by: Steve Kriss --- pkg/cmd/server/server.go | 1 + pkg/controller/backup_deletion_controller.go | 34 ++++- .../backup_deletion_controller_test.go | 123 ++++++++---------- pkg/controller/gc_controller.go | 35 ++++- pkg/controller/gc_controller_test.go | 67 +++++++++- 5 files changed, 183 insertions(+), 77 deletions(-) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 4f2e15236..4c023bbcd 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -606,6 +606,7 @@ func (s *server) runControllers(config *api.Config) error { gcController := controller.NewGCController( s.logger, s.sharedInformerFactory.Ark().V1().Backups(), + s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(), s.arkClient.ArkV1(), config.GCSyncPeriod.Duration, ) diff --git a/pkg/controller/backup_deletion_controller.go b/pkg/controller/backup_deletion_controller.go index aeae503b6..fcc4d8908 100644 --- a/pkg/controller/backup_deletion_controller.go +++ b/pkg/controller/backup_deletion_controller.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" + kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" ) @@ -103,8 +104,7 @@ func NewBackupDeletionController( deleteBackupRequestInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueue, - UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, + AddFunc: c.enqueue, }, ) @@ -162,6 +162,12 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e return err } + // Remove any existing deletion requests for this backup so we only have + // one at a time + if errs := c.deleteExistingDeletionRequests(req, log); errs != nil { + return kubeerrs.NewAggregate(errs) + } + // Don't allow deleting an in-progress backup if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) { _, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) { @@ -303,6 +309,30 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e return nil } +func (c *backupDeletionController) deleteExistingDeletionRequests(req *v1.DeleteBackupRequest, log logrus.FieldLogger) []error { + log.Info("Removing existing deletion requests for backup") + selector := labels.SelectorFromSet(labels.Set(map[string]string{ + v1.BackupNameLabel: req.Spec.BackupName, + })) + dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(req.Namespace).List(selector) + if err != nil { + return []error{errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")} + } + + var errs []error + for _, dbr := range dbrs { + if dbr.Name == req.Name { + continue + } + + if err := c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(dbr.Name, nil); err != nil { + errs = append(errs, errors.WithStack(err)) + } + } + + return errs +} + func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error { if c.resticMgr == nil { return nil diff --git a/pkg/controller/backup_deletion_controller_test.go b/pkg/controller/backup_deletion_controller_test.go index 835249aed..51eb68df4 100644 --- a/pkg/controller/backup_deletion_controller_test.go +++ b/pkg/controller/backup_deletion_controller_test.go @@ -17,7 +17,6 @@ limitations under the License. package controller import ( - "context" "fmt" "testing" "time" @@ -26,7 +25,6 @@ import ( pkgbackup "github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" - "github.com/heptio/ark/pkg/util/kube" arktest "github.com/heptio/ark/pkg/util/test" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -36,74 +34,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" core "k8s.io/client-go/testing" ) -func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) { - req := pkgbackup.NewDeleteBackupRequest("foo", "uid") - req.Namespace = "heptio-ark" - expected := kube.NamespaceAndName(req) - - client := fake.NewSimpleClientset(req) - - fakeWatch := watch.NewFake() - defer fakeWatch.Stop() - client.PrependWatchReactor("deletebackuprequests", core.DefaultWatchReactor(fakeWatch, nil)) - - sharedInformers := informers.NewSharedInformerFactory(client, 0) - - controller := NewBackupDeletionController( - arktest.NewLogger(), - sharedInformers.Ark().V1().DeleteBackupRequests(), - client.ArkV1(), // deleteBackupRequestClient - client.ArkV1(), // backupClient - nil, // snapshotService - nil, // backupService - "bucket", - sharedInformers.Ark().V1().Restores(), - client.ArkV1(), // restoreClient - NewBackupTracker(), - nil, // restic repository manager - sharedInformers.Ark().V1().PodVolumeBackups(), - ).(*backupDeletionController) - - // disable resync handler since we don't want to test it here - controller.resyncFunc = nil - - keys := make(chan string) - - controller.syncHandler = func(key string) error { - keys <- key - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - go sharedInformers.Start(ctx.Done()) - go controller.Run(ctx, 1) - - // wait for the AddFunc - select { - case <-ctx.Done(): - t.Fatal("test timed out waiting for AddFunc") - case key := <-keys: - assert.Equal(t, expected, key) - } - - req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed - fakeWatch.Add(req) - - // wait for the UpdateFunc - select { - case <-ctx.Done(): - t.Fatal("test timed out waiting for UpdateFunc") - case key := <-keys: - assert.Equal(t, expected, key) - } -} - func TestBackupDeletionControllerProcessQueueItem(t *testing.T) { client := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(client, 0) @@ -236,6 +169,62 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) { assert.Equal(t, expectedActions, td.client.Actions()) }) + t.Run("existing deletion requests for the backup are deleted", func(t *testing.T) { + td := setupBackupDeletionControllerTest() + defer td.backupService.AssertExpectations(t) + + // add the backup to the tracker so the execution of processRequest doesn't progress + // past checking for an in-progress backup. this makes validation easier. + td.controller.backupTracker.Add(td.req.Namespace, td.req.Spec.BackupName) + + require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(td.req)) + + existing := &v1.DeleteBackupRequest{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: td.req.Namespace, + Name: "bar", + Labels: map[string]string{ + v1.BackupNameLabel: td.req.Spec.BackupName, + }, + }, + Spec: v1.DeleteBackupRequestSpec{ + BackupName: td.req.Spec.BackupName, + }, + } + require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(existing)) + _, err := td.client.ArkV1().DeleteBackupRequests(td.req.Namespace).Create(existing) + require.NoError(t, err) + + require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add( + &v1.DeleteBackupRequest{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: td.req.Namespace, + Name: "bar-2", + Labels: map[string]string{ + v1.BackupNameLabel: "some-other-backup", + }, + }, + Spec: v1.DeleteBackupRequestSpec{ + BackupName: "some-other-backup", + }, + }, + )) + + assert.NoError(t, td.controller.processRequest(td.req)) + + expectedDeleteAction := core.NewDeleteAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + "bar", + ) + + // first action is the Create of an existing DBR for the backup as part of test data setup + // second action is the Delete of the existing DBR, which we're validating + // third action is the Patch of the DBR to set it to processed with an error + require.Len(t, td.client.Actions(), 3) + assert.Equal(t, expectedDeleteAction, td.client.Actions()[1]) + }) + t.Run("deleting an in progress backup isn't allowed", func(t *testing.T) { td := setupBackupDeletionControllerTest() defer td.backupService.AssertExpectations(t) diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index 284663d4c..e0e2612f2 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" @@ -39,6 +40,7 @@ type gcController struct { logger logrus.FieldLogger backupLister listers.BackupLister + deleteBackupRequestLister listers.DeleteBackupRequestLister deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter syncPeriod time.Duration @@ -49,6 +51,7 @@ type gcController struct { func NewGCController( logger logrus.FieldLogger, backupInformer informers.BackupInformer, + deleteBackupRequestInformer informers.DeleteBackupRequestInformer, deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter, syncPeriod time.Duration, ) Interface { @@ -62,12 +65,16 @@ func NewGCController( syncPeriod: syncPeriod, clock: clock.RealClock{}, backupLister: backupInformer.Lister(), + deleteBackupRequestLister: deleteBackupRequestInformer.Lister(), deleteBackupRequestClient: deleteBackupRequestClient, logger: logger, } c.syncHandler = c.processQueueItem - c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced) + c.cacheSyncWaiters = append(c.cacheSyncWaiters, + backupInformer.Informer().HasSynced, + deleteBackupRequestInformer.Informer().HasSynced, + ) c.resyncPeriod = syncPeriod c.resyncFunc = c.enqueueAllBackups @@ -130,12 +137,32 @@ func (c *gcController) processQueueItem(key string) error { return nil } - log.Info("Backup has expired. Creating a DeleteBackupRequest.") + log.Info("Backup has expired") + selector := labels.SelectorFromSet(labels.Set(map[string]string{ + arkv1api.BackupNameLabel: backup.Name, + arkv1api.BackupUIDLabel: string(backup.UID), + })) + + dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).List(selector) + if err != nil { + return errors.Wrap(err, "error listing existing DeleteBackupRequests for backup") + } + + // if there's an existing unprocessed deletion request for this backup, don't create + // another one + for _, dbr := range dbrs { + switch dbr.Status.Phase { + case "", arkv1api.DeleteBackupRequestPhaseNew, arkv1api.DeleteBackupRequestPhaseInProgress: + log.Info("Backup already has a pending deletion request") + return nil + } + } + + log.Info("Creating a new deletion request") req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID)) - _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req) - if err != nil { + if _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req); err != nil { return errors.Wrap(err, "error creating DeleteBackupRequest") } diff --git a/pkg/controller/gc_controller_test.go b/pkg/controller/gc_controller_test.go index bbd902214..0918a1325 100644 --- a/pkg/controller/gc_controller_test.go +++ b/pkg/controller/gc_controller_test.go @@ -25,7 +25,9 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/watch" @@ -46,6 +48,7 @@ func TestGCControllerEnqueueAllBackups(t *testing.T) { controller = NewGCController( arktest.NewLogger(), sharedInformers.Ark().V1().Backups(), + sharedInformers.Ark().V1().DeleteBackupRequests(), client.ArkV1(), 1*time.Millisecond, ).(*gcController) @@ -109,6 +112,7 @@ func TestGCControllerHasUpdateFunc(t *testing.T) { controller := NewGCController( arktest.NewLogger(), sharedInformers.Ark().V1().Backups(), + sharedInformers.Ark().V1().DeleteBackupRequests(), client.ArkV1(), 1*time.Millisecond, ).(*gcController) @@ -152,6 +156,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) { tests := []struct { name string backup *api.Backup + deleteBackupRequests []*api.DeleteBackupRequest expectDeletion bool createDeleteBackupRequestError bool expectError bool @@ -160,19 +165,63 @@ func TestGCControllerProcessQueueItem(t *testing.T) { name: "can't find backup - no error", }, { - name: "expired backup is deleted", + name: "unexpired backup is not deleted", + backup: arktest.NewTestBackup().WithName("backup-1"). + WithExpiration(fakeClock.Now().Add(1 * time.Minute)). + Backup, + expectDeletion: false, + }, + { + name: "expired backup with no pending deletion requests is deleted", backup: arktest.NewTestBackup().WithName("backup-1"). WithExpiration(fakeClock.Now().Add(-1 * time.Second)). Backup, expectDeletion: true, }, { - name: "unexpired backup is not deleted", + name: "expired backup with a pending deletion request is not deleted", backup: arktest.NewTestBackup().WithName("backup-1"). - WithExpiration(fakeClock.Now().Add(1 * time.Minute)). + WithExpiration(fakeClock.Now().Add(-1 * time.Second)). Backup, + deleteBackupRequests: []*api.DeleteBackupRequest{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: api.DefaultNamespace, + Name: "foo", + Labels: map[string]string{ + api.BackupNameLabel: "backup-1", + api.BackupUIDLabel: "", + }, + }, + Status: api.DeleteBackupRequestStatus{ + Phase: api.DeleteBackupRequestPhaseInProgress, + }, + }, + }, expectDeletion: false, }, + { + name: "expired backup with only processed deletion requests is deleted", + backup: arktest.NewTestBackup().WithName("backup-1"). + WithExpiration(fakeClock.Now().Add(-1 * time.Second)). + Backup, + deleteBackupRequests: []*api.DeleteBackupRequest{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: api.DefaultNamespace, + Name: "foo", + Labels: map[string]string{ + api.BackupNameLabel: "backup-1", + api.BackupUIDLabel: "", + }, + }, + Status: api.DeleteBackupRequestStatus{ + Phase: api.DeleteBackupRequestPhaseProcessed, + }, + }, + }, + expectDeletion: true, + }, { name: "create DeleteBackupRequest error returns an error", backup: arktest.NewTestBackup().WithName("backup-1"). @@ -194,6 +243,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) { controller := NewGCController( arktest.NewLogger(), sharedInformers.Ark().V1().Backups(), + sharedInformers.Ark().V1().DeleteBackupRequests(), client.ArkV1(), 1*time.Millisecond, ).(*gcController) @@ -205,6 +255,10 @@ func TestGCControllerProcessQueueItem(t *testing.T) { sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup) } + for _, dbr := range test.deleteBackupRequests { + sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(dbr) + } + if test.createDeleteBackupRequestError { client.PrependReactor("create", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { return true, nil, errors.New("foo") @@ -216,7 +270,12 @@ func TestGCControllerProcessQueueItem(t *testing.T) { assert.Equal(t, test.expectError, gotErr) if test.expectDeletion { - assert.Len(t, client.Actions(), 1) + require.Len(t, client.Actions(), 1) + + createAction, ok := client.Actions()[0].(core.CreateAction) + require.True(t, ok) + + assert.Equal(t, "deletebackuprequests", createAction.GetResource().Resource) } else { assert.Len(t, client.Actions(), 0) }