Issue fix 5226 (#5768)
* fix issue 5226 Signed-off-by: Lyndon-Li <lyonghui@vmware.com>pull/5840/head
parent
0933dd906f
commit
53c3f4b436
|
@ -0,0 +1 @@
|
|||
Fix issue 5226, invalidate the related backup repositories whenever the backup storage info change in BSL
|
|
@ -17,18 +17,25 @@ limitations under the License.
|
|||
package controller
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
@ -64,12 +71,88 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client
|
|||
|
||||
func (r *BackupRepoReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&velerov1api.BackupRepository{}).
|
||||
Watches(s, nil).
|
||||
Watches(&source.Kind{Type: &velerov1api.BackupStorageLocation{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.invalidateBackupReposForBSL),
|
||||
builder.WithPredicates(kube.NewUpdateEventPredicate(r.needInvalidBackupRepo))).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) invalidateBackupReposForBSL(bslObj client.Object) []reconcile.Request {
|
||||
bsl := bslObj.(*velerov1api.BackupStorageLocation)
|
||||
|
||||
list := &velerov1api.BackupRepositoryList{}
|
||||
options := &client.ListOptions{
|
||||
LabelSelector: labels.Set(map[string]string{
|
||||
velerov1api.StorageLocationLabel: label.GetValidName(bsl.Name),
|
||||
}).AsSelector(),
|
||||
}
|
||||
if err := r.List(context.TODO(), list, options); err != nil {
|
||||
r.logger.WithField("BSL", bsl.Name).WithError(err).Error("unable to list BackupRepositorys")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
for i := range list.Items {
|
||||
r.logger.WithField("BSL", bsl.Name).Infof("Invalidating Backup Repository %s", list.Items[i].Name)
|
||||
r.patchBackupRepository(context.Background(), &list.Items[i], repoNotReady("re-establish on BSL change"))
|
||||
}
|
||||
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) needInvalidBackupRepo(oldObj client.Object, newObj client.Object) bool {
|
||||
oldBSL := oldObj.(*velerov1api.BackupStorageLocation)
|
||||
newBSL := newObj.(*velerov1api.BackupStorageLocation)
|
||||
|
||||
oldStorage := oldBSL.Spec.StorageType.ObjectStorage
|
||||
newStorage := newBSL.Spec.StorageType.ObjectStorage
|
||||
oldConfig := oldBSL.Spec.Config
|
||||
newConfig := newBSL.Spec.Config
|
||||
|
||||
if oldStorage == nil {
|
||||
oldStorage = &velerov1api.ObjectStorageLocation{}
|
||||
}
|
||||
|
||||
if newStorage == nil {
|
||||
newStorage = &velerov1api.ObjectStorageLocation{}
|
||||
}
|
||||
|
||||
logger := r.logger.WithField("BSL", newBSL.Name)
|
||||
|
||||
if oldStorage.Bucket != newStorage.Bucket {
|
||||
logger.WithFields(logrus.Fields{
|
||||
"old bucket": oldStorage.Bucket,
|
||||
"new bucket": newStorage.Bucket,
|
||||
}).Info("BSL's bucket has changed, invalid backup repositories")
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
if oldStorage.Prefix != newStorage.Prefix {
|
||||
logger.WithFields(logrus.Fields{
|
||||
"old prefix": oldStorage.Prefix,
|
||||
"new prefix": newStorage.Prefix,
|
||||
}).Info("BSL's prefix has changed, invalid backup repositories")
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
if !bytes.Equal(oldStorage.CACert, newStorage.CACert) {
|
||||
logger.Info("BSL's CACert has changed, invalid backup repositories")
|
||||
return true
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(oldConfig, newConfig) {
|
||||
logger.Info("BSL's storage config has changed, invalid backup repositories")
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
log := r.logger.WithField("backupRepo", req.String())
|
||||
backupRepo := &velerov1api.BackupRepository{}
|
||||
|
@ -109,20 +192,29 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *velerov1api.BackupRepository) (string, error) {
|
||||
loc := &velerov1api.BackupStorageLocation{}
|
||||
|
||||
if err := r.Get(ctx, client.ObjectKey{
|
||||
Namespace: req.Namespace,
|
||||
Name: req.Spec.BackupStorageLocation,
|
||||
}, loc); err != nil {
|
||||
return "", errors.Wrapf(err, "error to get BSL %s", req.Spec.BackupStorageLocation)
|
||||
}
|
||||
|
||||
repoIdentifier, err := repoconfig.GetRepoIdentifier(loc, req.Spec.VolumeNamespace)
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "error to get identifier for repo %s", req.Name)
|
||||
}
|
||||
|
||||
return repoIdentifier, nil
|
||||
}
|
||||
|
||||
func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
|
||||
log.Info("Initializing backup repository")
|
||||
|
||||
// confirm the repo's BackupStorageLocation is valid
|
||||
loc := &velerov1api.BackupStorageLocation{}
|
||||
|
||||
if err := r.Get(context.Background(), client.ObjectKey{
|
||||
Namespace: req.Namespace,
|
||||
Name: req.Spec.BackupStorageLocation,
|
||||
}, loc); err != nil {
|
||||
return r.patchBackupRepository(ctx, req, repoNotReady(err.Error()))
|
||||
}
|
||||
|
||||
repoIdentifier, err := repoconfig.GetRepoIdentifier(loc, req.Spec.VolumeNamespace)
|
||||
repoIdentifier, err := r.getIdentiferByBSL(ctx, req)
|
||||
if err != nil {
|
||||
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
|
||||
rr.Status.Message = err.Error()
|
||||
|
@ -210,12 +302,20 @@ func dueForMaintenance(req *velerov1api.BackupRepository, now time.Time) bool {
|
|||
}
|
||||
|
||||
func (r *BackupRepoReconciler) checkNotReadyRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
|
||||
// no identifier: can't possibly be ready, so just return
|
||||
if req.Spec.ResticIdentifier == "" {
|
||||
return nil
|
||||
log.Info("Checking backup repository for readiness")
|
||||
|
||||
repoIdentifier, err := r.getIdentiferByBSL(ctx, req)
|
||||
if err != nil {
|
||||
return r.patchBackupRepository(ctx, req, repoNotReady(err.Error()))
|
||||
}
|
||||
|
||||
log.Info("Checking backup repository for readiness")
|
||||
if repoIdentifier != req.Spec.ResticIdentifier {
|
||||
if err := r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
|
||||
rr.Spec.ResticIdentifier = repoIdentifier
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// we need to ensure it (first check, if check fails, attempt to init)
|
||||
// because we don't know if it's been successfully initialized yet.
|
||||
|
|
|
@ -75,16 +75,28 @@ func TestPatchBackupRepository(t *testing.T) {
|
|||
|
||||
func TestCheckNotReadyRepo(t *testing.T) {
|
||||
rr := mockBackupRepositoryCR()
|
||||
rr.Spec.BackupStorageLocation = "default"
|
||||
rr.Spec.ResticIdentifier = "fake-identifier"
|
||||
rr.Spec.VolumeNamespace = "volume-ns-1"
|
||||
reconciler := mockBackupRepoReconciler(t, rr, "PrepareRepo", rr, nil)
|
||||
err := reconciler.Client.Create(context.TODO(), rr)
|
||||
assert.NoError(t, err)
|
||||
err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger)
|
||||
locations := &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
Config: map[string]string{"resticRepoPrefix": "s3:test.amazonaws.com/bucket/restic"},
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1api.DefaultNamespace,
|
||||
Name: rr.Spec.BackupStorageLocation,
|
||||
},
|
||||
}
|
||||
|
||||
err = reconciler.Client.Create(context.TODO(), locations)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, rr.Status.Phase, velerov1api.BackupRepositoryPhase(""))
|
||||
rr.Spec.ResticIdentifier = "s3:test.amazonaws.com/bucket/restic"
|
||||
err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, rr.Status.Phase, velerov1api.BackupRepositoryPhaseReady)
|
||||
assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier)
|
||||
}
|
||||
|
||||
func TestRunMaintenanceIfDue(t *testing.T) {
|
||||
|
@ -240,3 +252,129 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNeedInvalidBackupRepo(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
oldBSL *velerov1api.BackupStorageLocation
|
||||
newBSL *velerov1api.BackupStorageLocation
|
||||
expect bool
|
||||
}{
|
||||
{
|
||||
name: "no change",
|
||||
oldBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
Provider: "old-provider",
|
||||
},
|
||||
},
|
||||
newBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
Provider: "new-provider",
|
||||
},
|
||||
},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
name: "other part change",
|
||||
oldBSL: &velerov1api.BackupStorageLocation{},
|
||||
newBSL: &velerov1api.BackupStorageLocation{},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
name: "bucket change",
|
||||
oldBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
StorageType: velerov1api.StorageType{
|
||||
ObjectStorage: &velerov1api.ObjectStorageLocation{
|
||||
Bucket: "old-bucket",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
newBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
StorageType: velerov1api.StorageType{
|
||||
ObjectStorage: &velerov1api.ObjectStorageLocation{
|
||||
Bucket: "new-bucket",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "prefix change",
|
||||
oldBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
StorageType: velerov1api.StorageType{
|
||||
ObjectStorage: &velerov1api.ObjectStorageLocation{
|
||||
Prefix: "old-prefix",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
newBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
StorageType: velerov1api.StorageType{
|
||||
ObjectStorage: &velerov1api.ObjectStorageLocation{
|
||||
Prefix: "new-prefix",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "CACert change",
|
||||
oldBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
StorageType: velerov1api.StorageType{
|
||||
ObjectStorage: &velerov1api.ObjectStorageLocation{
|
||||
CACert: []byte{0x11, 0x12, 0x13},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
newBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
StorageType: velerov1api.StorageType{
|
||||
ObjectStorage: &velerov1api.ObjectStorageLocation{
|
||||
CACert: []byte{0x21, 0x22, 0x23},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
name: "config change",
|
||||
oldBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
Config: map[string]string{
|
||||
"key1": "value1",
|
||||
},
|
||||
},
|
||||
},
|
||||
newBSL: &velerov1api.BackupStorageLocation{
|
||||
Spec: velerov1api.BackupStorageLocationSpec{
|
||||
Config: map[string]string{
|
||||
"key2": "value2",
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: true,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
reconciler := NewBackupRepoReconciler(
|
||||
velerov1api.DefaultNamespace,
|
||||
velerotest.NewLogger(),
|
||||
velerotest.NewFakeControllerRuntimeClient(t),
|
||||
time.Duration(0), nil)
|
||||
|
||||
need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL)
|
||||
assert.Equal(t, test.expect, need)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kube
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
|
||||
type MapUpdateFunc func(client.Object) []reconcile.Request
|
||||
|
||||
// EnqueueRequestsFromMapUpdateFunc is for the same purpose with EnqueueRequestsFromMapFunc.
|
||||
// Merely, it is more friendly to updating the mapped objects in the MapUpdateFunc, because
|
||||
// on Update event, MapUpdateFunc is called for only once with the new object, so if MapUpdateFunc
|
||||
// does some update to the mapped objects, the update is done for once
|
||||
func EnqueueRequestsFromMapUpdateFunc(fn MapUpdateFunc) handler.EventHandler {
|
||||
return &enqueueRequestsFromMapFunc{
|
||||
toRequests: fn,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = &enqueueRequestsFromMapFunc{}
|
||||
|
||||
type enqueueRequestsFromMapFunc struct {
|
||||
toRequests MapUpdateFunc
|
||||
}
|
||||
|
||||
// Create implements EventHandler.
|
||||
func (e *enqueueRequestsFromMapFunc) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
|
||||
e.mapAndEnqueue(q, evt.Object)
|
||||
}
|
||||
|
||||
// Update implements EventHandler.
|
||||
func (e *enqueueRequestsFromMapFunc) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
|
||||
e.mapAndEnqueue(q, evt.ObjectNew)
|
||||
}
|
||||
|
||||
// Delete implements EventHandler.
|
||||
func (e *enqueueRequestsFromMapFunc) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
|
||||
e.mapAndEnqueue(q, evt.Object)
|
||||
}
|
||||
|
||||
// Generic implements EventHandler.
|
||||
func (e *enqueueRequestsFromMapFunc) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
|
||||
e.mapAndEnqueue(q, evt.Object)
|
||||
}
|
||||
|
||||
func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object) {
|
||||
reqs := map[reconcile.Request]struct{}{}
|
||||
|
||||
for _, req := range e.toRequests(object) {
|
||||
_, ok := reqs[req]
|
||||
if !ok {
|
||||
q.Add(req)
|
||||
reqs[req] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -37,7 +37,7 @@ func IsPodRunning(pod *corev1api.Pod) error {
|
|||
func IsPodScheduled(pod *corev1api.Pod) error {
|
||||
return isPodScheduledInStatus(pod, func(pod *corev1api.Pod) error {
|
||||
if pod.Status.Phase != corev1api.PodRunning && pod.Status.Phase != corev1api.PodPending {
|
||||
return errors.New("pod is running or pending")
|
||||
return errors.New("pod is not running or pending")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -74,6 +74,25 @@ func NewAllEventPredicate(f func(object client.Object) bool) predicate.Predicate
|
|||
}
|
||||
}
|
||||
|
||||
// NewUpdateEventPredicate creates a new Predicate that checks the update events with the provided func
|
||||
// and ignore others
|
||||
func NewUpdateEventPredicate(f func(client.Object, client.Object) bool) predicate.Predicate {
|
||||
return predicate.Funcs{
|
||||
UpdateFunc: func(event event.UpdateEvent) bool {
|
||||
return f(event.ObjectOld, event.ObjectNew)
|
||||
},
|
||||
CreateFunc: func(event event.CreateEvent) bool {
|
||||
return false
|
||||
},
|
||||
DeleteFunc: func(event event.DeleteEvent) bool {
|
||||
return false
|
||||
},
|
||||
GenericFunc: func(event event.GenericEvent) bool {
|
||||
return false
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// FalsePredicate always returns false for all kinds of events
|
||||
type FalsePredicate struct{}
|
||||
|
||||
|
|
Loading…
Reference in New Issue