update sync controller for backup locations

Signed-off-by: Steve Kriss <steve@heptio.com>
pull/799/head
Steve Kriss 2018-08-21 16:52:49 -07:00
parent 2750aa71b9
commit 0e94fa37f9
9 changed files with 382 additions and 535 deletions

View File

@ -36,4 +36,8 @@ const (
// a backup/restore-specific timeout value for pod volume operations (i.e.
// restic backups/restores).
PodVolumeOperationTimeoutAnnotation = "ark.heptio.com/pod-volume-timeout"
// StorageLocationLabel is the label key used to identify the storage
// location of a backup.
StorageLocationLabel = "ark.heptio.com/storage-location"
)

View File

@ -1,97 +0,0 @@
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudprovider
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/heptio/ark/pkg/apis/ark/v1"
)
// backupCacheBucket holds the backups and error from a ListBackups call.
type backupCacheBucket struct {
backups []*v1.Backup
error error
}
// backupCache caches ListBackups calls, refreshing them periodically.
type backupCache struct {
delegate BackupLister
lock sync.RWMutex
// This doesn't really need to be a map right now, but if we ever move to supporting multiple
// buckets, this will be ready for it.
buckets map[string]*backupCacheBucket
logger logrus.FieldLogger
}
var _ BackupLister = &backupCache{}
// NewBackupCache returns a new backup cache that refreshes from delegate every resyncPeriod.
func NewBackupCache(ctx context.Context, delegate BackupLister, resyncPeriod time.Duration, logger logrus.FieldLogger) BackupLister {
c := &backupCache{
delegate: delegate,
buckets: make(map[string]*backupCacheBucket),
logger: logger,
}
// Start the goroutine to refresh all buckets every resyncPeriod. This stops when ctx.Done() is
// available.
go wait.Until(c.refresh, resyncPeriod, ctx.Done())
return c
}
// refresh refreshes all the buckets currently in the cache by doing a live lookup via c.delegate.
func (c *backupCache) refresh() {
c.lock.Lock()
defer c.lock.Unlock()
c.logger.Debug("refreshing all cached backup lists from object storage")
for bucketName, bucket := range c.buckets {
c.logger.WithField("bucket", bucketName).Debug("Refreshing bucket")
bucket.backups, bucket.error = c.delegate.ListBackups(bucketName)
}
}
func (c *backupCache) ListBackups(bucketName string) ([]*v1.Backup, error) {
c.lock.RLock()
bucket, found := c.buckets[bucketName]
c.lock.RUnlock()
logContext := c.logger.WithField("bucket", bucketName)
if found {
logContext.Debug("Returning cached backup list")
return bucket.backups, bucket.error
}
logContext.Debug("Bucket is not in cache - doing a live lookup")
backups, err := c.delegate.ListBackups(bucketName)
c.lock.Lock()
c.buckets[bucketName] = &backupCacheBucket{backups: backups, error: err}
c.lock.Unlock()
return backups, err
}

View File

@ -1,170 +0,0 @@
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudprovider
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/heptio/ark/pkg/apis/ark/v1"
cloudprovidermocks "github.com/heptio/ark/pkg/cloudprovider/mocks"
"github.com/heptio/ark/pkg/util/test"
)
func TestNewBackupCache(t *testing.T) {
var (
delegate = &cloudprovidermocks.BackupLister{}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
logger = test.NewLogger()
)
defer cancel()
c := NewBackupCache(ctx, delegate, 100*time.Millisecond, logger)
// nothing in cache, live lookup
bucket1 := []*v1.Backup{
test.NewTestBackup().WithName("backup1").Backup,
test.NewTestBackup().WithName("backup2").Backup,
}
delegate.On("ListBackups", "bucket1").Return(bucket1, nil).Once()
// should be updated via refresh
updatedBucket1 := []*v1.Backup{
test.NewTestBackup().WithName("backup2").Backup,
}
delegate.On("ListBackups", "bucket1").Return(updatedBucket1, nil)
// nothing in cache, live lookup
bucket2 := []*v1.Backup{
test.NewTestBackup().WithName("backup5").Backup,
test.NewTestBackup().WithName("backup6").Backup,
}
delegate.On("ListBackups", "bucket2").Return(bucket2, nil).Once()
// should be updated via refresh
updatedBucket2 := []*v1.Backup{
test.NewTestBackup().WithName("backup7").Backup,
}
delegate.On("ListBackups", "bucket2").Return(updatedBucket2, nil)
backups, err := c.ListBackups("bucket1")
assert.Equal(t, bucket1, backups)
assert.NoError(t, err)
backups, err = c.ListBackups("bucket2")
assert.Equal(t, bucket2, backups)
assert.NoError(t, err)
var done1, done2 bool
for {
select {
case <-ctx.Done():
t.Fatal("timed out")
default:
if done1 && done2 {
return
}
}
backups, err = c.ListBackups("bucket1")
if len(backups) == 1 {
if assert.Equal(t, updatedBucket1[0], backups[0]) {
done1 = true
}
}
backups, err = c.ListBackups("bucket2")
if len(backups) == 1 {
if assert.Equal(t, updatedBucket2[0], backups[0]) {
done2 = true
}
}
time.Sleep(100 * time.Millisecond)
}
}
func TestBackupCacheRefresh(t *testing.T) {
var (
delegate = &cloudprovidermocks.BackupLister{}
logger = test.NewLogger()
)
c := &backupCache{
delegate: delegate,
buckets: map[string]*backupCacheBucket{
"bucket1": {},
"bucket2": {},
},
logger: logger,
}
bucket1 := []*v1.Backup{
test.NewTestBackup().WithName("backup1").Backup,
test.NewTestBackup().WithName("backup2").Backup,
}
delegate.On("ListBackups", "bucket1").Return(bucket1, nil)
delegate.On("ListBackups", "bucket2").Return(nil, errors.New("bad"))
c.refresh()
assert.Equal(t, bucket1, c.buckets["bucket1"].backups)
assert.NoError(t, c.buckets["bucket1"].error)
assert.Empty(t, c.buckets["bucket2"].backups)
assert.EqualError(t, c.buckets["bucket2"].error, "bad")
}
func TestBackupCacheGetAllBackupsUsesCacheIfPresent(t *testing.T) {
var (
delegate = &cloudprovidermocks.BackupLister{}
logger = test.NewLogger()
bucket1 = []*v1.Backup{
test.NewTestBackup().WithName("backup1").Backup,
test.NewTestBackup().WithName("backup2").Backup,
}
)
c := &backupCache{
delegate: delegate,
buckets: map[string]*backupCacheBucket{
"bucket1": {
backups: bucket1,
},
},
logger: logger,
}
bucket2 := []*v1.Backup{
test.NewTestBackup().WithName("backup3").Backup,
test.NewTestBackup().WithName("backup4").Backup,
}
delegate.On("ListBackups", "bucket2").Return(bucket2, nil)
backups, err := c.ListBackups("bucket1")
assert.Equal(t, bucket1, backups)
assert.NoError(t, err)
backups, err = c.ListBackups("bucket2")
assert.Equal(t, bucket2, backups)
assert.NoError(t, err)
}

View File

@ -147,22 +147,6 @@ func DownloadBackup(objectStore ObjectStore, bucket, backupName string) (io.Read
return objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName))
}
type liveBackupLister struct {
logger logrus.FieldLogger
objectStore ObjectStore
}
func NewLiveBackupLister(logger logrus.FieldLogger, objectStore ObjectStore) BackupLister {
return &liveBackupLister{
logger: logger,
objectStore: objectStore,
}
}
func (l *liveBackupLister) ListBackups(bucket string) ([]*api.Backup, error) {
return ListBackups(l.logger, l.objectStore, bucket)
}
func ListBackups(logger logrus.FieldLogger, objectStore ObjectStore, bucket string) ([]*api.Backup, error) {
prefixes, err := objectStore.ListCommonPrefixes(bucket, "/")
if err != nil {

View File

@ -589,12 +589,6 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
ctx := s.ctx
var wg sync.WaitGroup
cloudBackupCacheResyncPeriod := durationMin(controller.GCSyncPeriod, s.config.backupSyncPeriod)
s.logger.Infof("Caching cloud backups every %s", cloudBackupCacheResyncPeriod)
liveBackupLister := cloudprovider.NewLiveBackupLister(s.logger, s.objectStore)
cachedBackupLister := cloudprovider.NewBackupCache(ctx, liveBackupLister, cloudBackupCacheResyncPeriod, s.logger)
go func() {
metricsMux := http.NewServeMux()
metricsMux.Handle("/metrics", promhttp.Handler())
@ -608,12 +602,13 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
backupSyncController := controller.NewBackupSyncController(
s.arkClient.ArkV1(),
cachedBackupLister,
config.BackupStorageProvider.Bucket,
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
s.config.backupSyncPeriod,
s.namespace,
s.sharedInformerFactory.Ark().V1().Backups(),
s.pluginRegistry,
s.logger,
s.logLevel,
)
wg.Add(1)
go func() {
@ -775,7 +770,7 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())
// Remove this sometime after v0.8.0
// TODO(1.0): remove
cache.WaitForCacheSync(ctx.Done(), s.sharedInformerFactory.Ark().V1().Backups().Informer().HasSynced)
s.removeDeprecatedGCFinalizer()
@ -789,9 +784,10 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
return nil
}
const gcFinalizer = "gc.ark.heptio.com"
// TODO(1.0): remove
func (s *server) removeDeprecatedGCFinalizer() {
const gcFinalizer = "gc.ark.heptio.com"
backups, err := s.sharedInformerFactory.Ark().V1().Backups().Lister().List(labels.Everything())
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("error listing backups from cache - unable to remove old finalizers")

View File

@ -349,6 +349,12 @@ func (controller *backupController) getLocationAndValidate(itm *api.Backup, defa
itm.Spec.StorageLocation = defaultBackupLocation
}
// add the storage location as a label for easy filtering later.
if itm.Labels == nil {
itm.Labels = make(map[string]string)
}
itm.Labels[api.StorageLocationLabel] = itm.Spec.StorageLocation
var backupLocation *api.BackupStorageLocation
backupLocation, err := controller.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation)
if err != nil {

View File

@ -25,13 +25,12 @@ import (
"testing"
"time"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -364,10 +363,14 @@ func TestProcessBackup(t *testing.T) {
type SpecPatch struct {
StorageLocation string `json:"storageLocation"`
}
type ObjectMetaPatch struct {
Labels map[string]string `json:"labels"`
}
type Patch struct {
Status StatusPatch `json:"status"`
Spec SpecPatch `json:"spec,omitempty"`
Status StatusPatch `json:"status"`
Spec SpecPatch `json:"spec,omitempty"`
ObjectMeta ObjectMetaPatch `json:"metadata,omitempty"`
}
decode := func(decoder *json.Decoder) (interface{}, error) {
@ -389,6 +392,11 @@ func TestProcessBackup(t *testing.T) {
Spec: SpecPatch{
StorageLocation: "default",
},
ObjectMeta: ObjectMetaPatch{
Labels: map[string]string{
v1.StorageLocationLabel: "default",
},
},
}
} else {
expected = Patch{
@ -397,6 +405,11 @@ func TestProcessBackup(t *testing.T) {
Phase: v1.BackupPhaseInProgress,
Expiration: expiration,
},
ObjectMeta: ObjectMetaPatch{
Labels: map[string]string{
v1.StorageLocationLabel: test.backup.Spec.StorageLocation,
},
},
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package controller
import (
"context"
"time"
"github.com/pkg/errors"
@ -27,133 +26,172 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/plugin"
"github.com/heptio/ark/pkg/util/kube"
"github.com/heptio/ark/pkg/util/stringslice"
)
type backupSyncController struct {
client arkv1client.BackupsGetter
cloudBackupLister cloudprovider.BackupLister
bucket string
syncPeriod time.Duration
namespace string
backupLister listers.BackupLister
backupInformerSynced cache.InformerSynced
logger logrus.FieldLogger
*genericController
client arkv1client.BackupsGetter
backupLister listers.BackupLister
backupStorageLocationLister listers.BackupStorageLocationLister
namespace string
newPluginManager func(logrus.FieldLogger) plugin.Manager
listCloudBackups func(logrus.FieldLogger, cloudprovider.ObjectStore, string) ([]*arkv1api.Backup, error)
}
func NewBackupSyncController(
client arkv1client.BackupsGetter,
cloudBackupLister cloudprovider.BackupLister,
bucket string,
backupInformer informers.BackupInformer,
backupStorageLocationInformer informers.BackupStorageLocationInformer,
syncPeriod time.Duration,
namespace string,
backupInformer informers.BackupInformer,
pluginRegistry plugin.Registry,
logger logrus.FieldLogger,
logLevel logrus.Level,
) Interface {
if syncPeriod < time.Minute {
logger.Infof("Provided backup sync period %v is too short. Setting to 1 minute", syncPeriod)
syncPeriod = time.Minute
}
return &backupSyncController{
client: client,
cloudBackupLister: cloudBackupLister,
bucket: bucket,
syncPeriod: syncPeriod,
namespace: namespace,
backupLister: backupInformer.Lister(),
backupInformerSynced: backupInformer.Informer().HasSynced,
logger: logger,
}
}
// Run is a blocking function that continually runs the object storage -> Ark API
// sync process according to the controller's syncPeriod. It will return when it
// receives on the ctx.Done() channel.
func (c *backupSyncController) Run(ctx context.Context, workers int) error {
c.logger.Info("Running backup sync controller")
c.logger.Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), c.backupInformerSynced) {
return errors.New("timed out waiting for caches to sync")
c := &backupSyncController{
genericController: newGenericController("backup-sync", logger),
client: client,
namespace: namespace,
backupLister: backupInformer.Lister(),
backupStorageLocationLister: backupStorageLocationInformer.Lister(),
newPluginManager: func(logger logrus.FieldLogger) plugin.Manager {
return plugin.NewManager(logger, logLevel, pluginRegistry)
},
listCloudBackups: cloudprovider.ListBackups,
}
c.logger.Info("Caches are synced")
wait.Until(c.run, c.syncPeriod, ctx.Done())
return nil
c.resyncFunc = c.run
c.resyncPeriod = syncPeriod
c.cacheSyncWaiters = []cache.InformerSynced{
backupInformer.Informer().HasSynced,
backupStorageLocationInformer.Informer().HasSynced,
}
return c
}
const gcFinalizer = "gc.ark.heptio.com"
func (c *backupSyncController) run() {
c.logger.Info("Syncing backups from object storage")
backups, err := c.cloudBackupLister.ListBackups(c.bucket)
c.logger.Info("Syncing backups from backup storage into cluster")
locations, err := c.backupStorageLocationLister.BackupStorageLocations(c.namespace).List(labels.Everything())
if err != nil {
c.logger.WithError(err).Error("error listing backups")
c.logger.WithError(errors.WithStack(err)).Error("Error getting backup storage locations from lister")
return
}
c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage")
cloudBackupNames := sets.NewString()
for _, cloudBackup := range backups {
logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup))
logContext.Info("Syncing backup")
pluginManager := c.newPluginManager(c.logger)
cloudBackupNames.Insert(cloudBackup.Name)
for _, location := range locations {
log := c.logger.WithField("backupLocation", location.Name)
log.Info("Syncing backups from backup location")
// If we're syncing backups made by pre-0.8.0 versions, the server removes all finalizers
// faster than the sync finishes. Just process them as we find them.
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
cloudBackup.Namespace = c.namespace
cloudBackup.ResourceVersion = ""
// Backup only if backup does not exist in Kubernetes or if we are not able to get the backup for any reason.
_, err := c.client.Backups(cloudBackup.Namespace).Get(cloudBackup.Name, metav1.GetOptions{})
objectStore, err := getObjectStoreForLocation(location, pluginManager)
if err != nil {
log.WithError(err).Error("Error getting object store for location")
continue
}
backupsInBackupStore, err := c.listCloudBackups(log, objectStore, location.Spec.ObjectStorage.Bucket)
if err != nil {
log.WithError(err).Error("Error listing backups in object store")
continue
}
log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from object store")
cloudBackupNames := sets.NewString()
for _, cloudBackup := range backupsInBackupStore {
log = log.WithField("backup", kube.NamespaceAndName(cloudBackup))
log.Debug("Checking cloud backup to see if it needs to be synced into the cluster")
cloudBackupNames.Insert(cloudBackup.Name)
// use the controller's namespace when getting the backup because that's where we
// are syncing backups to, regardless of the namespace of the cloud backup.
_, err := c.client.Backups(c.namespace).Get(cloudBackup.Name, metav1.GetOptions{})
if err == nil {
log.Debug("Backup already exists in cluster")
continue
}
if !kuberrs.IsNotFound(err) {
logContext.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with backup sync")
log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster")
}
if _, err := c.client.Backups(cloudBackup.Namespace).Create(cloudBackup); err != nil && !kuberrs.IsAlreadyExists(err) {
logContext.WithError(errors.WithStack(err)).Error("Error syncing backup from object storage")
// remove the pre-v0.8.0 gcFinalizer if it exists
// TODO(1.0): remove this
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
cloudBackup.Namespace = c.namespace
cloudBackup.ResourceVersion = ""
// update the StorageLocation field and label since the name of the location
// may be different in this cluster than in the cluster that created the
// backup.
cloudBackup.Spec.StorageLocation = location.Name
if cloudBackup.Labels == nil {
cloudBackup.Labels = make(map[string]string)
}
cloudBackup.Labels[arkv1api.StorageLocationLabel] = cloudBackup.Spec.StorageLocation
_, err = c.client.Backups(cloudBackup.Namespace).Create(cloudBackup)
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debug("Backup already exists in cluster")
case err != nil && !kuberrs.IsAlreadyExists(err):
log.WithError(errors.WithStack(err)).Error("Error syncing backup into cluster")
default:
log.Debug("Synced backup into cluster")
}
}
}
c.deleteUnused(cloudBackupNames)
return
c.deleteOrphanedBackups(location.Name, cloudBackupNames, log)
}
}
// deleteUnused deletes backup objects from Kubernetes if they are complete
// and there is no corresponding backup in the object storage.
func (c *backupSyncController) deleteUnused(cloudBackupNames sets.String) {
// Backups objects in Kubernetes
backups, err := c.backupLister.Backups(c.namespace).List(labels.Everything())
// deleteOrphanedBackups deletes backup objects from Kubernetes that have the specified location
// and a phase of Completed, but no corresponding backup in object storage.
func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudBackupNames sets.String, log logrus.FieldLogger) {
locationSelector := labels.Set(map[string]string{
arkv1api.StorageLocationLabel: locationName,
}).AsSelector()
backups, err := c.backupLister.Backups(c.namespace).List(locationSelector)
if err != nil {
c.logger.WithError(errors.WithStack(err)).Error("Error listing backup from Kubernetes")
log.WithError(errors.WithStack(err)).Error("Error listing backups from cluster")
return
}
if len(backups) == 0 {
return
}
// For each completed backup object in Kubernetes, delete it if it
// does not have a corresponding backup in object storage
for _, backup := range backups {
if backup.Status.Phase == api.BackupPhaseCompleted && !cloudBackupNames.Has(backup.Name) {
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
c.logger.WithError(errors.WithStack(err)).Error("Error deleting unused backup from Kubernetes")
} else {
c.logger.Debugf("Deleted backup: %s", backup.Name)
}
log = log.WithField("backup", backup.Name)
if backup.Status.Phase != arkv1api.BackupPhaseCompleted || cloudBackupNames.Has(backup.Name) {
continue
}
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster")
} else {
log.Debug("Deleted orphaned backup from cluster")
}
}
return
}

View File

@ -20,281 +20,354 @@ import (
"testing"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
core "k8s.io/client-go/testing"
"github.com/heptio/ark/pkg/apis/ark/v1"
cloudprovidermocks "github.com/heptio/ark/pkg/cloudprovider/mocks"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
"github.com/heptio/ark/pkg/util/stringslice"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
func defaultLocationsList(namespace string) []*arkv1api.BackupStorageLocation {
return []*arkv1api.BackupStorageLocation{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "location-1",
},
Spec: arkv1api.BackupStorageLocationSpec{
Provider: "objStoreProvider",
StorageType: arkv1api.StorageType{
ObjectStorage: &arkv1api.ObjectStorageLocation{
Bucket: "bucket-1",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "location-2",
},
Spec: arkv1api.BackupStorageLocationSpec{
Provider: "objStoreProvider",
StorageType: arkv1api.StorageType{
ObjectStorage: &arkv1api.ObjectStorageLocation{
Bucket: "bucket-2",
},
},
},
},
}
}
func TestBackupSyncControllerRun(t *testing.T) {
tests := []struct {
name string
listBackupsError error
cloudBackups []*v1.Backup
namespace string
existingBackups sets.String
name string
namespace string
locations []*arkv1api.BackupStorageLocation
cloudBackups map[string][]*arkv1api.Backup
existingBackups []*arkv1api.Backup
}{
{
name: "no cloud backups",
},
{
name: "backup lister returns error on ListBackups",
listBackupsError: errors.New("listBackups"),
},
{
name: "normal case",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
name: "normal case",
namespace: "ns-1",
locations: defaultLocationsList("ns-1"),
cloudBackups: map[string][]*arkv1api.Backup{
"bucket-1": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
},
"bucket-2": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
},
},
{
name: "Finalizer gets removed on sync",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithFinalizers(gcFinalizer).Backup,
},
name: "gcFinalizer (only) gets removed on sync",
namespace: "ns-1",
locations: defaultLocationsList("ns-1"),
cloudBackups: map[string][]*arkv1api.Backup{
"bucket-1": {
arktest.NewTestBackup().WithNamespace("ns-1").WithFinalizers("a-finalizer", gcFinalizer, "some-other-finalizer").Backup,
},
},
},
{
name: "Only target finalizer is removed",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithFinalizers(gcFinalizer, "blah").Backup,
},
namespace: "ns-1",
},
{
name: "backups get created in Ark server's namespace",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-2").WithName("backup-2").Backup,
},
name: "all synced backups get created in Ark server's namespace",
namespace: "heptio-ark",
locations: defaultLocationsList("heptio-ark"),
cloudBackups: map[string][]*arkv1api.Backup{
"bucket-1": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
},
"bucket-2": {
arktest.NewTestBackup().WithNamespace("ns-2").WithName("backup-3").Backup,
arktest.NewTestBackup().WithNamespace("heptio-ark").WithName("backup-4").Backup,
},
},
},
{
name: "normal case with backups that already exist in Kubernetes",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
name: "new backups get synced when some cloud backups already exist in the cluster",
namespace: "ns-1",
locations: defaultLocationsList("ns-1"),
cloudBackups: map[string][]*arkv1api.Backup{
"bucket-1": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
},
"bucket-2": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-4").Backup,
},
},
existingBackups: []*arkv1api.Backup{
// add a label to each existing backup so we can differentiate it from the cloud
// backup during verification
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel("i-exist", "true").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithLabel("i-exist", "true").Backup,
},
},
{
name: "backup storage location names and labels get updated",
namespace: "ns-1",
locations: defaultLocationsList("ns-1"),
cloudBackups: map[string][]*arkv1api.Backup{
"bucket-1": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithStorageLocation("foo").WithLabel(arkv1api.StorageLocationLabel, "foo").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
},
"bucket-2": {
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithStorageLocation("bar").WithLabel(arkv1api.StorageLocationLabel, "bar").Backup,
},
},
existingBackups: sets.NewString("backup-2", "backup-3"),
namespace: "ns-1",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
backupLister = &cloudprovidermocks.BackupLister{}
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = arktest.NewLogger()
pluginManager = &pluginmocks.Manager{}
objectStore = &arktest.ObjectStore{}
)
c := NewBackupSyncController(
client.ArkV1(),
backupLister,
"bucket",
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().BackupStorageLocations(),
time.Duration(0),
test.namespace,
sharedInformers.Ark().V1().Backups(),
logger,
nil, // pluginRegistry
arktest.NewLogger(),
logrus.DebugLevel,
).(*backupSyncController)
backupLister.On("ListBackups", "bucket").Return(test.cloudBackups, test.listBackupsError)
c.newPluginManager = func(_ logrus.FieldLogger) plugin.Manager { return pluginManager }
pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil)
pluginManager.On("CleanupClients").Return(nil)
objectStore.On("Init", mock.Anything).Return(nil)
expectedActions := make([]core.Action, 0)
for _, location := range test.locations {
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location))
}
client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
if test.existingBackups.Has(getAction.GetName()) {
return true, nil, nil
c.listCloudBackups = func(_ logrus.FieldLogger, _ cloudprovider.ObjectStore, bucket string) ([]*arkv1api.Backup, error) {
backups, ok := test.cloudBackups[bucket]
if !ok {
return nil, errors.New("bucket not found")
}
// We return nil in place of the found backup object because
// we exclusively check for the error and don't use the object
// returned by the Get / Backups call.
return true, nil, apierrors.NewNotFound(v1.SchemeGroupVersion.WithResource("backups").GroupResource(), getAction.GetName())
})
return backups, nil
}
for _, existingBackup := range test.existingBackups {
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(existingBackup))
_, err := client.ArkV1().Backups(test.namespace).Create(existingBackup)
require.NoError(t, err)
}
client.ClearActions()
c.run()
// we only expect creates for items within the target bucket
for _, cloudBackup := range test.cloudBackups {
// Verify that the run function stripped the GC finalizer
assert.False(t, stringslice.Has(cloudBackup.Finalizers, gcFinalizer))
assert.Equal(t, test.namespace, cloudBackup.Namespace)
for bucket, backups := range test.cloudBackups {
for _, cloudBackup := range backups {
obj, err := client.ArkV1().Backups(test.namespace).Get(cloudBackup.Name, metav1.GetOptions{})
require.NoError(t, err)
actionGet := core.NewGetAction(
v1.SchemeGroupVersion.WithResource("backups"),
test.namespace,
cloudBackup.Name,
)
expectedActions = append(expectedActions, actionGet)
// did this cloud backup already exist in the cluster?
var existing *arkv1api.Backup
for _, obj := range test.existingBackups {
if obj.Name == cloudBackup.Name {
existing = obj
break
}
}
if test.existingBackups.Has(cloudBackup.Name) {
continue
if existing != nil {
// if this cloud backup already exists in the cluster, make sure that what we get from the
// client is the existing backup, not the cloud one.
assert.Equal(t, existing, obj)
} else {
// verify that the GC finalizer is removed
assert.Equal(t, stringslice.Except(cloudBackup.Finalizers, gcFinalizer), obj.Finalizers)
// verify that the storage location field and label are set properly
for _, location := range test.locations {
if location.Spec.ObjectStorage.Bucket == bucket {
assert.Equal(t, location.Name, obj.Spec.StorageLocation)
assert.Equal(t, location.Name, obj.Labels[arkv1api.StorageLocationLabel])
break
}
}
}
}
actionCreate := core.NewCreateAction(
v1.SchemeGroupVersion.WithResource("backups"),
test.namespace,
cloudBackup,
)
expectedActions = append(expectedActions, actionCreate)
}
assert.Equal(t, expectedActions, client.Actions())
})
}
}
func TestDeleteUnused(t *testing.T) {
func TestDeleteOrphanedBackups(t *testing.T) {
tests := []struct {
name string
cloudBackups []*v1.Backup
cloudBackups sets.String
k8sBackups []*arktest.TestBackup
namespace string
expectedDeletes sets.String
}{
{
name: "no overlapping backups",
namespace: "ns-1",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
name: "no overlapping backups",
namespace: "ns-1",
cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"),
k8sBackups: []*arktest.TestBackup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupB").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupB").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
},
expectedDeletes: sets.NewString("backupA", "backupB", "backupC"),
},
{
name: "some overlapping backups",
namespace: "ns-1",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
name: "some overlapping backups",
namespace: "ns-1",
cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"),
k8sBackups: []*arktest.TestBackup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-C").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
},
expectedDeletes: sets.NewString("backupC"),
expectedDeletes: sets.NewString("backup-C"),
},
{
name: "all overlapping backups",
namespace: "ns-1",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
name: "all overlapping backups",
namespace: "ns-1",
cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"),
k8sBackups: []*arktest.TestBackup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
},
expectedDeletes: sets.NewString(),
},
{
name: "no overlapping backups but including backups that are not complete",
namespace: "ns-1",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
name: "no overlapping backups but including backups that are not complete",
namespace: "ns-1",
cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"),
k8sBackups: []*arktest.TestBackup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithPhase(v1.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("Deleting").WithPhase(v1.BackupPhaseDeleting),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("Failed").WithPhase(v1.BackupPhaseFailed),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("FailedValidation").WithPhase(v1.BackupPhaseFailedValidation),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("InProgress").WithPhase(v1.BackupPhaseInProgress),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("New").WithPhase(v1.BackupPhaseNew),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("Deleting").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseDeleting),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("Failed").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailed),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("FailedValidation").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailedValidation),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("InProgress").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseInProgress),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("New").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseNew),
},
expectedDeletes: sets.NewString("backupA"),
},
{
name: "all overlapping backups and all backups that are not complete",
namespace: "ns-1",
cloudBackups: []*v1.Backup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup,
},
name: "all overlapping backups and all backups that are not complete",
namespace: "ns-1",
cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"),
k8sBackups: []*arktest.TestBackup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseFailed),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseFailedValidation),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithPhase(v1.BackupPhaseInProgress),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailed),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailedValidation),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseInProgress),
},
expectedDeletes: sets.NewString(),
},
{
name: "no completed backups in other locations are deleted",
namespace: "ns-1",
cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"),
k8sBackups: []*arktest.TestBackup{
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-C").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-4").WithLabel(arkv1api.StorageLocationLabel, "alternate").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-5").WithLabel(arkv1api.StorageLocationLabel, "alternate").WithPhase(arkv1api.BackupPhaseCompleted),
arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-6").WithLabel(arkv1api.StorageLocationLabel, "alternate").WithPhase(arkv1api.BackupPhaseCompleted),
},
expectedDeletes: sets.NewString("backup-C"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
backupLister = &cloudprovidermocks.BackupLister{}
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = arktest.NewLogger()
)
c := NewBackupSyncController(
client.ArkV1(),
backupLister,
"bucket",
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().BackupStorageLocations(),
time.Duration(0),
test.namespace,
sharedInformers.Ark().V1().Backups(),
logger,
nil, // pluginRegistry
arktest.NewLogger(),
logrus.InfoLevel,
).(*backupSyncController)
expectedDeleteActions := make([]core.Action, 0)
// setup: insert backups into Kubernetes
for _, backup := range test.k8sBackups {
// add test backup to informer
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup.Backup), "Error adding backup to informer")
// add test backup to client
_, err := client.Ark().Backups(test.namespace).Create(backup.Backup)
require.NoError(t, err, "Error adding backup to clientset")
// if we expect this backup to be deleted, set up the expected DeleteAction
if test.expectedDeletes.Has(backup.Name) {
actionDelete := core.NewDeleteAction(
v1.SchemeGroupVersion.WithResource("backups"),
arkv1api.SchemeGroupVersion.WithResource("backups"),
test.namespace,
backup.Name,
)
expectedDeleteActions = append(expectedDeleteActions, actionDelete)
}
// add test backup to informer:
err := sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup.Backup)
assert.NoError(t, err, "Error adding backup to informer")
// add test backup to kubernetes:
_, err = client.Ark().Backups(test.namespace).Create(backup.Backup)
assert.NoError(t, err, "Error deleting from clientset")
}
// get names of client backups
testBackupNames := sets.NewString()
for _, cloudBackup := range test.cloudBackups {
testBackupNames.Insert(cloudBackup.Name)
}
c.deleteUnused(testBackupNames)
c.deleteOrphanedBackups("default", test.cloudBackups, arktest.NewLogger())
numBackups, err := numBackups(t, client, c.namespace)
assert.NoError(t, err)