Merge pull request #687 from skriss/fix-delete-issues

fix delete issues
pull/693/head
Carlisia 2018-07-23 10:40:14 -07:00 committed by GitHub
commit c47a364ab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 218 additions and 80 deletions

View File

@ -20,6 +20,7 @@ import (
"regexp"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pkg/errors"
@ -219,7 +220,17 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error {
_, err := b.ec2.DeleteSnapshot(req)
return errors.WithStack(err)
// if it's a NotFound error, we don't need to return an error
// since the snapshot is not there.
// see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidSnapshot.NotFound" {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
var ebsVolumeIDRegex = regexp.MustCompile("vol-.*")

View File

@ -19,6 +19,7 @@ package azure
import (
"context"
"fmt"
"net/http"
"os"
"regexp"
"strings"
@ -280,7 +281,16 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error {
err = <-errChan
return errors.WithStack(err)
// if it's a 404 (not found) error, we don't need to return an error
// since the snapshot is not there.
if azureErr, ok := err.(autorest.DetailedError); ok && azureErr.StatusCode == http.StatusNotFound {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
func getComputeResourceName(subscription, resourceGroup, resource, name string) string {

View File

@ -19,6 +19,7 @@ package gcp
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"github.com/pkg/errors"
@ -27,6 +28,7 @@ import (
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"k8s.io/apimachinery/pkg/runtime"
@ -212,7 +214,16 @@ func getSnapshotTags(arkTags map[string]string, diskDescription string, log logr
func (b *blockStore) DeleteSnapshot(snapshotID string) error {
_, err := b.gce.Snapshots.Delete(b.project, snapshotID).Do()
return errors.WithStack(err)
// if it's a 404 (not found) error, we don't need to return an error
// since the snapshot is not there.
if gcpErr, ok := err.(*googleapi.Error); ok && gcpErr.Code == http.StatusNotFound {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (b *blockStore) GetVolumeID(pv runtime.Unstructured) (string, error) {

View File

@ -666,6 +666,7 @@ func (s *server) runControllers(config *api.Config) error {
gcController := controller.NewGCController(
s.logger,
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(),
s.arkClient.ArkV1(),
config.GCSyncPeriod.Duration,
)

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
)
@ -103,8 +104,7 @@ func NewBackupDeletionController(
deleteBackupRequestInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
AddFunc: c.enqueue,
},
)
@ -162,6 +162,12 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return err
}
// Remove any existing deletion requests for this backup so we only have
// one at a time
if errs := c.deleteExistingDeletionRequests(req, log); errs != nil {
return kubeerrs.NewAggregate(errs)
}
// Don't allow deleting an in-progress backup
if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) {
_, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
@ -303,6 +309,30 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return nil
}
func (c *backupDeletionController) deleteExistingDeletionRequests(req *v1.DeleteBackupRequest, log logrus.FieldLogger) []error {
log.Info("Removing existing deletion requests for backup")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
v1.BackupNameLabel: req.Spec.BackupName,
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(req.Namespace).List(selector)
if err != nil {
return []error{errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")}
}
var errs []error
for _, dbr := range dbrs {
if dbr.Name == req.Name {
continue
}
if err := c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(dbr.Name, nil); err != nil {
errs = append(errs, errors.WithStack(err))
}
}
return errs
}
func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error {
if c.resticMgr == nil {
return nil

View File

@ -17,7 +17,6 @@ limitations under the License.
package controller
import (
"context"
"fmt"
"testing"
"time"
@ -26,7 +25,6 @@ import (
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/util/kube"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
@ -36,74 +34,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
)
func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) {
req := pkgbackup.NewDeleteBackupRequest("foo", "uid")
req.Namespace = "heptio-ark"
expected := kube.NamespaceAndName(req)
client := fake.NewSimpleClientset(req)
fakeWatch := watch.NewFake()
defer fakeWatch.Stop()
client.PrependWatchReactor("deletebackuprequests", core.DefaultWatchReactor(fakeWatch, nil))
sharedInformers := informers.NewSharedInformerFactory(client, 0)
controller := NewBackupDeletionController(
arktest.NewLogger(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(), // deleteBackupRequestClient
client.ArkV1(), // backupClient
nil, // snapshotService
nil, // backupService
"bucket",
sharedInformers.Ark().V1().Restores(),
client.ArkV1(), // restoreClient
NewBackupTracker(),
nil, // restic repository manager
sharedInformers.Ark().V1().PodVolumeBackups(),
).(*backupDeletionController)
// disable resync handler since we don't want to test it here
controller.resyncFunc = nil
keys := make(chan string)
controller.syncHandler = func(key string) error {
keys <- key
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go sharedInformers.Start(ctx.Done())
go controller.Run(ctx, 1)
// wait for the AddFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for AddFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
fakeWatch.Add(req)
// wait for the UpdateFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for UpdateFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
}
func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
@ -236,6 +169,62 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
assert.Equal(t, expectedActions, td.client.Actions())
})
t.Run("existing deletion requests for the backup are deleted", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)
// add the backup to the tracker so the execution of processRequest doesn't progress
// past checking for an in-progress backup. this makes validation easier.
td.controller.backupTracker.Add(td.req.Namespace, td.req.Spec.BackupName)
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(td.req))
existing := &v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar",
Labels: map[string]string{
v1.BackupNameLabel: td.req.Spec.BackupName,
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: td.req.Spec.BackupName,
},
}
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(existing))
_, err := td.client.ArkV1().DeleteBackupRequests(td.req.Namespace).Create(existing)
require.NoError(t, err)
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(
&v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar-2",
Labels: map[string]string{
v1.BackupNameLabel: "some-other-backup",
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: "some-other-backup",
},
},
))
assert.NoError(t, td.controller.processRequest(td.req))
expectedDeleteAction := core.NewDeleteAction(
v1.SchemeGroupVersion.WithResource("deletebackuprequests"),
td.req.Namespace,
"bar",
)
// first action is the Create of an existing DBR for the backup as part of test data setup
// second action is the Delete of the existing DBR, which we're validating
// third action is the Patch of the DBR to set it to processed with an error
require.Len(t, td.client.Actions(), 3)
assert.Equal(t, expectedDeleteAction, td.client.Actions()[1])
})
t.Run("deleting an in progress backup isn't allowed", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@ -39,6 +40,7 @@ type gcController struct {
logger logrus.FieldLogger
backupLister listers.BackupLister
deleteBackupRequestLister listers.DeleteBackupRequestLister
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter
syncPeriod time.Duration
@ -49,6 +51,7 @@ type gcController struct {
func NewGCController(
logger logrus.FieldLogger,
backupInformer informers.BackupInformer,
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter,
syncPeriod time.Duration,
) Interface {
@ -62,12 +65,16 @@ func NewGCController(
syncPeriod: syncPeriod,
clock: clock.RealClock{},
backupLister: backupInformer.Lister(),
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
deleteBackupRequestClient: deleteBackupRequestClient,
logger: logger,
}
c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced)
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
deleteBackupRequestInformer.Informer().HasSynced,
)
c.resyncPeriod = syncPeriod
c.resyncFunc = c.enqueueAllBackups
@ -130,12 +137,32 @@ func (c *gcController) processQueueItem(key string) error {
return nil
}
log.Info("Backup has expired. Creating a DeleteBackupRequest.")
log.Info("Backup has expired")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
arkv1api.BackupNameLabel: backup.Name,
arkv1api.BackupUIDLabel: string(backup.UID),
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).List(selector)
if err != nil {
return errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")
}
// if there's an existing unprocessed deletion request for this backup, don't create
// another one
for _, dbr := range dbrs {
switch dbr.Status.Phase {
case "", arkv1api.DeleteBackupRequestPhaseNew, arkv1api.DeleteBackupRequestPhaseInProgress:
log.Info("Backup already has a pending deletion request")
return nil
}
}
log.Info("Creating a new deletion request")
req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID))
_, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req)
if err != nil {
if _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req); err != nil {
return errors.Wrap(err, "error creating DeleteBackupRequest")
}

View File

@ -25,7 +25,9 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/watch"
@ -46,6 +48,7 @@ func TestGCControllerEnqueueAllBackups(t *testing.T) {
controller = NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@ -109,6 +112,7 @@ func TestGCControllerHasUpdateFunc(t *testing.T) {
controller := NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@ -152,6 +156,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
tests := []struct {
name string
backup *api.Backup
deleteBackupRequests []*api.DeleteBackupRequest
expectDeletion bool
createDeleteBackupRequestError bool
expectError bool
@ -160,19 +165,63 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
name: "can't find backup - no error",
},
{
name: "expired backup is deleted",
name: "unexpired backup is not deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1 * time.Minute)).
Backup,
expectDeletion: false,
},
{
name: "expired backup with no pending deletion requests is deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
expectDeletion: true,
},
{
name: "unexpired backup is not deleted",
name: "expired backup with a pending deletion request is not deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1 * time.Minute)).
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
deleteBackupRequests: []*api.DeleteBackupRequest{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "foo",
Labels: map[string]string{
api.BackupNameLabel: "backup-1",
api.BackupUIDLabel: "",
},
},
Status: api.DeleteBackupRequestStatus{
Phase: api.DeleteBackupRequestPhaseInProgress,
},
},
},
expectDeletion: false,
},
{
name: "expired backup with only processed deletion requests is deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
deleteBackupRequests: []*api.DeleteBackupRequest{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "foo",
Labels: map[string]string{
api.BackupNameLabel: "backup-1",
api.BackupUIDLabel: "",
},
},
Status: api.DeleteBackupRequestStatus{
Phase: api.DeleteBackupRequestPhaseProcessed,
},
},
},
expectDeletion: true,
},
{
name: "create DeleteBackupRequest error returns an error",
backup: arktest.NewTestBackup().WithName("backup-1").
@ -194,6 +243,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
controller := NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@ -205,6 +255,10 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup)
}
for _, dbr := range test.deleteBackupRequests {
sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(dbr)
}
if test.createDeleteBackupRequestError {
client.PrependReactor("create", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("foo")
@ -216,7 +270,12 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
assert.Equal(t, test.expectError, gotErr)
if test.expectDeletion {
assert.Len(t, client.Actions(), 1)
require.Len(t, client.Actions(), 1)
createAction, ok := client.Actions()[0].(core.CreateAction)
require.True(t, ok)
assert.Equal(t, "deletebackuprequests", createAction.GetResource().Resource)
} else {
assert.Len(t, client.Actions(), 0)
}