Merge pull request #680 from carlisia/c-delete-from-etcd
Delete backups from etcd if they're not in storagepull/704/head
commit
82f1cd87dc
|
@ -38,3 +38,5 @@ debug
|
||||||
*.diff
|
*.diff
|
||||||
|
|
||||||
_site/
|
_site/
|
||||||
|
|
||||||
|
.vs
|
||||||
|
|
|
@ -608,6 +608,7 @@ func (s *server) runControllers(config *api.Config) error {
|
||||||
config.BackupStorageProvider.Bucket,
|
config.BackupStorageProvider.Bucket,
|
||||||
config.BackupSyncPeriod.Duration,
|
config.BackupSyncPeriod.Duration,
|
||||||
s.namespace,
|
s.namespace,
|
||||||
|
s.sharedInformerFactory.Ark().V1().Backups(),
|
||||||
s.logger,
|
s.logger,
|
||||||
)
|
)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
|
@ -25,21 +25,28 @@ import (
|
||||||
|
|
||||||
kuberrs "k8s.io/apimachinery/pkg/api/errors"
|
kuberrs "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
|
||||||
"github.com/heptio/ark/pkg/cloudprovider"
|
"github.com/heptio/ark/pkg/cloudprovider"
|
||||||
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
||||||
|
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
||||||
|
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||||
"github.com/heptio/ark/pkg/util/kube"
|
"github.com/heptio/ark/pkg/util/kube"
|
||||||
"github.com/heptio/ark/pkg/util/stringslice"
|
"github.com/heptio/ark/pkg/util/stringslice"
|
||||||
)
|
)
|
||||||
|
|
||||||
type backupSyncController struct {
|
type backupSyncController struct {
|
||||||
client arkv1client.BackupsGetter
|
client arkv1client.BackupsGetter
|
||||||
backupService cloudprovider.BackupService
|
backupService cloudprovider.BackupService
|
||||||
bucket string
|
bucket string
|
||||||
syncPeriod time.Duration
|
syncPeriod time.Duration
|
||||||
namespace string
|
namespace string
|
||||||
logger logrus.FieldLogger
|
backupLister listers.BackupLister
|
||||||
|
backupInformerSynced cache.InformerSynced
|
||||||
|
logger logrus.FieldLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBackupSyncController(
|
func NewBackupSyncController(
|
||||||
|
@ -48,6 +55,7 @@ func NewBackupSyncController(
|
||||||
bucket string,
|
bucket string,
|
||||||
syncPeriod time.Duration,
|
syncPeriod time.Duration,
|
||||||
namespace string,
|
namespace string,
|
||||||
|
backupInformer informers.BackupInformer,
|
||||||
logger logrus.FieldLogger,
|
logger logrus.FieldLogger,
|
||||||
) Interface {
|
) Interface {
|
||||||
if syncPeriod < time.Minute {
|
if syncPeriod < time.Minute {
|
||||||
|
@ -55,12 +63,14 @@ func NewBackupSyncController(
|
||||||
syncPeriod = time.Minute
|
syncPeriod = time.Minute
|
||||||
}
|
}
|
||||||
return &backupSyncController{
|
return &backupSyncController{
|
||||||
client: client,
|
client: client,
|
||||||
backupService: backupService,
|
backupService: backupService,
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
logger: logger,
|
backupLister: backupInformer.Lister(),
|
||||||
|
backupInformerSynced: backupInformer.Informer().HasSynced,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,6 +79,11 @@ func NewBackupSyncController(
|
||||||
// receives on the ctx.Done() channel.
|
// receives on the ctx.Done() channel.
|
||||||
func (c *backupSyncController) Run(ctx context.Context, workers int) error {
|
func (c *backupSyncController) Run(ctx context.Context, workers int) error {
|
||||||
c.logger.Info("Running backup sync controller")
|
c.logger.Info("Running backup sync controller")
|
||||||
|
c.logger.Info("Waiting for caches to sync")
|
||||||
|
if !cache.WaitForCacheSync(ctx.Done(), c.backupInformerSynced) {
|
||||||
|
return errors.New("timed out waiting for caches to sync")
|
||||||
|
}
|
||||||
|
c.logger.Info("Caches are synced")
|
||||||
wait.Until(c.run, c.syncPeriod, ctx.Done())
|
wait.Until(c.run, c.syncPeriod, ctx.Done())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -84,10 +99,13 @@ func (c *backupSyncController) run() {
|
||||||
}
|
}
|
||||||
c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage")
|
c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage")
|
||||||
|
|
||||||
|
cloudBackupNames := sets.NewString()
|
||||||
for _, cloudBackup := range backups {
|
for _, cloudBackup := range backups {
|
||||||
logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup))
|
logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup))
|
||||||
logContext.Info("Syncing backup")
|
logContext.Info("Syncing backup")
|
||||||
|
|
||||||
|
cloudBackupNames.Insert(cloudBackup.Name)
|
||||||
|
|
||||||
// If we're syncing backups made by pre-0.8.0 versions, the server removes all finalizers
|
// If we're syncing backups made by pre-0.8.0 versions, the server removes all finalizers
|
||||||
// faster than the sync finishes. Just process them as we find them.
|
// faster than the sync finishes. Just process them as we find them.
|
||||||
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
|
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
|
||||||
|
@ -107,4 +125,32 @@ func (c *backupSyncController) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.deleteUnused(cloudBackupNames)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteUnused deletes backup objects from Kubernetes if there is no corresponding backup in the object storage.
|
||||||
|
func (c *backupSyncController) deleteUnused(cloudBackupNames sets.String) {
|
||||||
|
// Backups objects in Kubernetes
|
||||||
|
backups, err := c.backupLister.Backups(c.namespace).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
c.logger.WithError(errors.WithStack(err)).Error("Error listing backup from Kubernetes")
|
||||||
|
}
|
||||||
|
if len(backups) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each backup object in Kubernetes, verify if has a corresponding backup in the object storage. If not, delete it.
|
||||||
|
for _, backup := range backups {
|
||||||
|
if !cloudBackupNames.Has(backup.Name) {
|
||||||
|
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
|
||||||
|
c.logger.WithError(errors.WithStack(err)).Error("Error deleting unused backup from Kubernetes")
|
||||||
|
} else {
|
||||||
|
c.logger.Debugf("Deleted backup: %s", backup.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,12 +21,14 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
|
|
||||||
"github.com/heptio/ark/pkg/apis/ark/v1"
|
"github.com/heptio/ark/pkg/apis/ark/v1"
|
||||||
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
|
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
|
||||||
|
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
|
||||||
"github.com/heptio/ark/pkg/util/stringslice"
|
"github.com/heptio/ark/pkg/util/stringslice"
|
||||||
arktest "github.com/heptio/ark/pkg/util/test"
|
arktest "github.com/heptio/ark/pkg/util/test"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -94,9 +96,10 @@ func TestBackupSyncControllerRun(t *testing.T) {
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
bs = &arktest.BackupService{}
|
bs = &arktest.BackupService{}
|
||||||
client = fake.NewSimpleClientset()
|
client = fake.NewSimpleClientset()
|
||||||
logger = arktest.NewLogger()
|
sharedInformers = informers.NewSharedInformerFactory(client, 0)
|
||||||
|
logger = arktest.NewLogger()
|
||||||
)
|
)
|
||||||
|
|
||||||
c := NewBackupSyncController(
|
c := NewBackupSyncController(
|
||||||
|
@ -105,6 +108,7 @@ func TestBackupSyncControllerRun(t *testing.T) {
|
||||||
"bucket",
|
"bucket",
|
||||||
time.Duration(0),
|
time.Duration(0),
|
||||||
test.namespace,
|
test.namespace,
|
||||||
|
sharedInformers.Ark().V1().Backups(),
|
||||||
logger,
|
logger,
|
||||||
).(*backupSyncController)
|
).(*backupSyncController)
|
||||||
|
|
||||||
|
@ -154,3 +158,138 @@ func TestBackupSyncControllerRun(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteUnused(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
cloudBackups []*v1.Backup
|
||||||
|
k8sBackups []*arktest.TestBackup
|
||||||
|
namespace string
|
||||||
|
expectedDeletes sets.String
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no overlapping backups",
|
||||||
|
namespace: "ns-1",
|
||||||
|
cloudBackups: []*v1.Backup{
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
|
||||||
|
},
|
||||||
|
k8sBackups: []*arktest.TestBackup{
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupB").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
},
|
||||||
|
expectedDeletes: sets.NewString("backupA", "backupB", "backupC"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some overlapping backups",
|
||||||
|
namespace: "ns-1",
|
||||||
|
cloudBackups: []*v1.Backup{
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
|
||||||
|
},
|
||||||
|
k8sBackups: []*arktest.TestBackup{
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
},
|
||||||
|
expectedDeletes: sets.NewString("backupC"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all overlapping backups",
|
||||||
|
namespace: "ns-1",
|
||||||
|
cloudBackups: []*v1.Backup{
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
|
||||||
|
},
|
||||||
|
k8sBackups: []*arktest.TestBackup{
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithPhase(v1.BackupPhaseCompleted),
|
||||||
|
},
|
||||||
|
expectedDeletes: sets.NewString(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
var (
|
||||||
|
bs = &arktest.BackupService{}
|
||||||
|
client = fake.NewSimpleClientset()
|
||||||
|
sharedInformers = informers.NewSharedInformerFactory(client, 0)
|
||||||
|
logger = arktest.NewLogger()
|
||||||
|
)
|
||||||
|
|
||||||
|
c := NewBackupSyncController(
|
||||||
|
client.ArkV1(),
|
||||||
|
bs,
|
||||||
|
"bucket",
|
||||||
|
time.Duration(0),
|
||||||
|
test.namespace,
|
||||||
|
sharedInformers.Ark().V1().Backups(),
|
||||||
|
logger,
|
||||||
|
).(*backupSyncController)
|
||||||
|
|
||||||
|
expectedDeleteActions := make([]core.Action, 0)
|
||||||
|
|
||||||
|
// setup: insert backups into Kubernetes
|
||||||
|
for _, backup := range test.k8sBackups {
|
||||||
|
if test.expectedDeletes.Has(backup.Name) {
|
||||||
|
actionDelete := core.NewDeleteAction(
|
||||||
|
v1.SchemeGroupVersion.WithResource("backups"),
|
||||||
|
test.namespace,
|
||||||
|
backup.Name,
|
||||||
|
)
|
||||||
|
expectedDeleteActions = append(expectedDeleteActions, actionDelete)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add test backup to informer:
|
||||||
|
err := sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup.Backup)
|
||||||
|
assert.NoError(t, err, "Error adding backup to informer")
|
||||||
|
|
||||||
|
// add test backup to kubernetes:
|
||||||
|
_, err = client.Ark().Backups(test.namespace).Create(backup.Backup)
|
||||||
|
assert.NoError(t, err, "Error deleting from clientset")
|
||||||
|
}
|
||||||
|
|
||||||
|
// get names of client backups
|
||||||
|
testBackupNames := sets.NewString()
|
||||||
|
for _, cloudBackup := range test.cloudBackups {
|
||||||
|
testBackupNames.Insert(cloudBackup.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.deleteUnused(testBackupNames)
|
||||||
|
|
||||||
|
numBackups, err := numBackups(t, client, c.namespace)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expected := len(test.k8sBackups) - len(test.expectedDeletes)
|
||||||
|
assert.Equal(t, expected, numBackups)
|
||||||
|
|
||||||
|
arktest.CompareActions(t, expectedDeleteActions, getDeleteActions(client.Actions()))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getDeleteActions(actions []core.Action) []core.Action {
|
||||||
|
var deleteActions []core.Action
|
||||||
|
for _, action := range actions {
|
||||||
|
if action.GetVerb() == "delete" {
|
||||||
|
deleteActions = append(deleteActions, action)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return deleteActions
|
||||||
|
}
|
||||||
|
|
||||||
|
func numBackups(t *testing.T, c *fake.Clientset, ns string) (int, error) {
|
||||||
|
t.Helper()
|
||||||
|
existingK8SBackups, err := c.ArkV1().Backups(ns).List(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(existingK8SBackups.Items), nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue