Merge pull request #4894 from blackpiglet/bsl-refactor
Refactor BSL controller with periodical enqueue sourcepull/4911/head
commit
879d03398b
|
@ -0,0 +1 @@
|
|||
Refactor BSL controller with periodical enqueue source
|
|
@ -29,11 +29,18 @@ import (
|
|||
"sigs.k8s.io/cluster-api/util/patch"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/storage"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/persistence"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
const (
|
||||
backupStorageLocationSyncPeriod = 1 * time.Minute
|
||||
)
|
||||
|
||||
// BackupStorageLocationReconciler reconciles a BackupStorageLocation object
|
||||
|
@ -53,96 +60,92 @@ type BackupStorageLocationReconciler struct {
|
|||
// +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations/status,verbs=get;update;patch
|
||||
func (r *BackupStorageLocationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
log := r.Log.WithField("controller", BackupStorageLocation)
|
||||
var unavailableErrors []string
|
||||
var location velerov1api.BackupStorageLocation
|
||||
|
||||
log.Debug("Validating availability of backup storage locations.")
|
||||
log := r.Log.WithField("controller", BackupStorageLocation).WithField(BackupStorageLocation, req.NamespacedName.String())
|
||||
log.Debug("Validating availability of BackupStorageLocation")
|
||||
|
||||
locationList, err := storage.ListBackupStorageLocations(r.Ctx, r.Client, req.Namespace)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("No backup storage locations found, at least one is required")
|
||||
return ctrl.Result{}, err
|
||||
log.WithError(err).Error("No BackupStorageLocations found, at least one is required")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
pluginManager := r.NewPluginManager(log)
|
||||
defer pluginManager.CleanupClients()
|
||||
|
||||
var defaultFound bool
|
||||
for _, location := range locationList.Items {
|
||||
if location.Spec.Default {
|
||||
for _, bsl := range locationList.Items {
|
||||
if bsl.Spec.Default {
|
||||
defaultFound = true
|
||||
break
|
||||
}
|
||||
if bsl.Name == req.Name && bsl.Namespace == req.Namespace {
|
||||
location = bsl
|
||||
}
|
||||
}
|
||||
|
||||
var unavailableErrors []string
|
||||
var anyVerified bool
|
||||
for i := range locationList.Items {
|
||||
location := &locationList.Items[i]
|
||||
isDefault := location.Spec.Default
|
||||
log := r.Log.WithField("controller", BackupStorageLocation).WithField(BackupStorageLocation, location.Name)
|
||||
if location.Name == "" || location.Namespace == "" {
|
||||
log.WithError(err).Error("BackupStorageLocation is not found")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// TODO(2.0) remove this check since the server default will be deprecated
|
||||
if !defaultFound && location.Name == r.DefaultBackupLocationInfo.StorageLocation {
|
||||
// For backward-compatible, to configure the backup storage location as the default if
|
||||
// none of the BSLs be marked as the default and the BSL name matches against the
|
||||
// "velero server --default-backup-storage-location".
|
||||
isDefault = true
|
||||
defaultFound = true
|
||||
isDefault := location.Spec.Default
|
||||
|
||||
// TODO(2.0) remove this check since the server default will be deprecated
|
||||
if !defaultFound && location.Name == r.DefaultBackupLocationInfo.StorageLocation {
|
||||
// For backward-compatible, to configure the backup storage location as the default if
|
||||
// none of the BSLs be marked as the default and the BSL name matches against the
|
||||
// "velero server --default-backup-storage-location".
|
||||
isDefault = true
|
||||
defaultFound = true
|
||||
}
|
||||
|
||||
func() {
|
||||
// Initialize the patch helper.
|
||||
patchHelper, err := patch.NewHelper(&location, r.Client)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting a patch helper to update BackupStorageLocation")
|
||||
return
|
||||
}
|
||||
|
||||
if !storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.DefaultBackupLocationInfo.ServerValidationFrequency, log) {
|
||||
log.Debug("Validation not required, skipping...")
|
||||
continue
|
||||
}
|
||||
|
||||
anyVerified = true
|
||||
|
||||
func() {
|
||||
// Initialize the patch helper.
|
||||
patchHelper, err := patch.NewHelper(location, r.Client)
|
||||
defer func() {
|
||||
location.Status.LastValidationTime = &metav1.Time{Time: time.Now().UTC()}
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting a patch helper to update this resource")
|
||||
return
|
||||
log.Info("BackupStorageLocation is invalid, marking as unavailable")
|
||||
err = errors.Wrapf(err, "BackupStorageLocation %q is unavailable", location.Name)
|
||||
unavailableErrors = append(unavailableErrors, err.Error())
|
||||
location.Status.Phase = velerov1api.BackupStorageLocationPhaseUnavailable
|
||||
location.Status.Message = err.Error()
|
||||
} else {
|
||||
log.Info("BackupStorageLocations is valid, marking as available")
|
||||
location.Status.Phase = velerov1api.BackupStorageLocationPhaseAvailable
|
||||
location.Status.Message = ""
|
||||
}
|
||||
defer func() {
|
||||
location.Status.LastValidationTime = &metav1.Time{Time: time.Now().UTC()}
|
||||
if err != nil {
|
||||
log.Info("Backup storage location is invalid, marking as unavailable")
|
||||
err = errors.Wrapf(err, "Backup storage location %q is unavailable", location.Name)
|
||||
unavailableErrors = append(unavailableErrors, err.Error())
|
||||
location.Status.Phase = velerov1api.BackupStorageLocationPhaseUnavailable
|
||||
location.Status.Message = err.Error()
|
||||
} else {
|
||||
log.Info("Backup storage location valid, marking as available")
|
||||
location.Status.Phase = velerov1api.BackupStorageLocationPhaseAvailable
|
||||
location.Status.Message = ""
|
||||
}
|
||||
if err := patchHelper.Patch(r.Ctx, location); err != nil {
|
||||
log.WithError(err).Error("Error updating backup storage location phase")
|
||||
}
|
||||
}()
|
||||
|
||||
backupStore, err := r.BackupStoreGetter.Get(location, pluginManager, log)
|
||||
if err != nil {
|
||||
err = errors.Wrapf(err, "Error getting a backup store")
|
||||
return
|
||||
if err := patchHelper.Patch(r.Ctx, &location); err != nil {
|
||||
log.WithError(err).Error("Error updating BackupStorageLocation phase")
|
||||
}
|
||||
|
||||
// updates the default backup location
|
||||
location.Spec.Default = isDefault
|
||||
|
||||
log.Info("Validating backup storage location")
|
||||
err = backupStore.IsValid()
|
||||
}()
|
||||
}
|
||||
|
||||
if !anyVerified {
|
||||
log.Debug("No backup storage locations needed to be validated")
|
||||
}
|
||||
backupStore, err := r.BackupStoreGetter.Get(&location, pluginManager, log)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting a backup store")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("Validating BackupStorageLocation")
|
||||
err = backupStore.IsValid()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("fail to validate backup store")
|
||||
return
|
||||
}
|
||||
|
||||
// updates the default backup location
|
||||
location.Spec.Default = isDefault
|
||||
}()
|
||||
|
||||
r.logReconciledPhase(defaultFound, locationList, unavailableErrors)
|
||||
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *BackupStorageLocationReconciler) logReconciledPhase(defaultFound bool, locationList velerov1api.BackupStorageLocationList, errs []string) {
|
||||
|
@ -169,21 +172,48 @@ func (r *BackupStorageLocationReconciler) logReconciledPhase(defaultFound bool,
|
|||
|
||||
if numUnavailable+numUnknown == len(locationList.Items) { // no available BSL
|
||||
if len(errs) > 0 {
|
||||
log.Errorf("Current backup storage locations available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; "))
|
||||
log.Errorf("Current BackupStorageLocations available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; "))
|
||||
} else {
|
||||
log.Errorf("Current backup storage locations available/unavailable/unknown: %v/%v/%v)", numAvailable, numUnavailable, numUnknown)
|
||||
log.Errorf("Current BackupStorageLocations available/unavailable/unknown: %v/%v/%v)", numAvailable, numUnavailable, numUnknown)
|
||||
}
|
||||
} else if numUnavailable > 0 { // some but not all BSL unavailable
|
||||
log.Warnf("Unavailable backup storage locations detected: available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; "))
|
||||
log.Warnf("Unavailable BackupStorageLocations detected: available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; "))
|
||||
}
|
||||
|
||||
if !defaultFound {
|
||||
log.Warn("There is no existing backup storage location set as default. Please see `velero backup-location -h` for options.")
|
||||
log.Warn("There is no existing BackupStorageLocation set as default. Please see `velero backup-location -h` for options.")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
g := kube.NewPeriodicalEnqueueSource(
|
||||
r.Log,
|
||||
mgr.GetClient(),
|
||||
&velerov1api.BackupStorageLocationList{},
|
||||
backupStorageLocationSyncPeriod,
|
||||
// Add filter function to enqueue BSL per ValidationFrequency setting.
|
||||
func(object client.Object) bool {
|
||||
location := object.(*velerov1api.BackupStorageLocation)
|
||||
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.DefaultBackupLocationInfo.ServerValidationFrequency, r.Log.WithField("controller", BackupStorageLocation))
|
||||
},
|
||||
)
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&velerov1api.BackupStorageLocation{}).
|
||||
// Handle BSL's creation event and spec update event to let changed BSL got validation immediately.
|
||||
WithEventFilter(predicate.Funcs{
|
||||
CreateFunc: func(ce event.CreateEvent) bool {
|
||||
return true
|
||||
},
|
||||
UpdateFunc: func(ue event.UpdateEvent) bool {
|
||||
return ue.ObjectNew.GetGeneration() != ue.ObjectOld.GetGeneration()
|
||||
},
|
||||
DeleteFunc: func(de event.DeleteEvent) bool {
|
||||
return false
|
||||
},
|
||||
GenericFunc: func(ge event.GenericEvent) bool {
|
||||
return false
|
||||
},
|
||||
}).
|
||||
Watches(g, nil).
|
||||
Complete(r)
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ var _ = Describe("Backup Storage Location Reconciler", func() {
|
|||
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
|
||||
r := BackupStorageLocationReconciler{
|
||||
Ctx: ctx,
|
||||
Client: fake.NewFakeClientWithScheme(scheme.Scheme, locations),
|
||||
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(),
|
||||
DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{
|
||||
StorageLocation: "location-1",
|
||||
ServerValidationFrequency: 0,
|
||||
|
@ -91,18 +91,17 @@ var _ = Describe("Backup Storage Location Reconciler", func() {
|
|||
Log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
actualResult, err := r.Reconcile(ctx, ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{Namespace: "ns-1"},
|
||||
})
|
||||
|
||||
Expect(actualResult).To(BeEquivalentTo(ctrl.Result{Requeue: true}))
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
// Assertions
|
||||
for i, location := range locations.Items {
|
||||
actualResult, err := r.Reconcile(ctx, ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{Namespace: location.Namespace, Name: location.Name},
|
||||
})
|
||||
Expect(actualResult).To(BeEquivalentTo(ctrl.Result{}))
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace}
|
||||
instance := &velerov1api.BackupStorageLocation{}
|
||||
err := r.Client.Get(ctx, key, instance)
|
||||
err = r.Client.Get(ctx, key, instance)
|
||||
Expect(err).To(BeNil())
|
||||
Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault))
|
||||
Expect(instance.Status.Phase).To(BeIdenticalTo(tests[i].expectedPhase))
|
||||
|
@ -147,7 +146,7 @@ var _ = Describe("Backup Storage Location Reconciler", func() {
|
|||
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
|
||||
r := BackupStorageLocationReconciler{
|
||||
Ctx: ctx,
|
||||
Client: fake.NewFakeClientWithScheme(scheme.Scheme, locations),
|
||||
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(),
|
||||
DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{
|
||||
StorageLocation: "default",
|
||||
ServerValidationFrequency: 0,
|
||||
|
@ -157,91 +156,19 @@ var _ = Describe("Backup Storage Location Reconciler", func() {
|
|||
Log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
actualResult, err := r.Reconcile(ctx, ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{Namespace: "ns-1"},
|
||||
})
|
||||
|
||||
Expect(actualResult).To(BeEquivalentTo(ctrl.Result{Requeue: true}))
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
// Assertions
|
||||
for i, location := range locations.Items {
|
||||
actualResult, err := r.Reconcile(ctx, ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{Namespace: location.Namespace, Name: location.Name},
|
||||
})
|
||||
Expect(actualResult).To(BeEquivalentTo(ctrl.Result{}))
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace}
|
||||
instance := &velerov1api.BackupStorageLocation{}
|
||||
err := r.Client.Get(ctx, key, instance)
|
||||
err = r.Client.Get(ctx, key, instance)
|
||||
Expect(err).To(BeNil())
|
||||
Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault))
|
||||
}
|
||||
})
|
||||
|
||||
It("Should not patch a backup storage location object status phase if the location's validation frequency is specifically set to zero", func() {
|
||||
tests := []struct {
|
||||
backupLocation *velerov1api.BackupStorageLocation
|
||||
isValidError error
|
||||
expectedIsDefault bool
|
||||
expectedPhase velerov1api.BackupStorageLocationPhase
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
backupLocation: builder.ForBackupStorageLocation("ns-1", "location-1").ValidationFrequency(0).LastValidationTime(time.Now()).Result(),
|
||||
isValidError: nil,
|
||||
expectedIsDefault: false,
|
||||
expectedPhase: "",
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
backupLocation: builder.ForBackupStorageLocation("ns-1", "location-2").ValidationFrequency(0).LastValidationTime(time.Now()).Result(),
|
||||
isValidError: nil,
|
||||
expectedIsDefault: false,
|
||||
expectedPhase: "",
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
// Setup
|
||||
var (
|
||||
pluginManager = &pluginmocks.Manager{}
|
||||
backupStores = make(map[string]*persistencemocks.BackupStore)
|
||||
)
|
||||
pluginManager.On("CleanupClients").Return(nil)
|
||||
|
||||
locations := new(velerov1api.BackupStorageLocationList)
|
||||
for i, test := range tests {
|
||||
location := test.backupLocation
|
||||
locations.Items = append(locations.Items, *location)
|
||||
backupStores[location.Name] = &persistencemocks.BackupStore{}
|
||||
backupStores[location.Name].On("IsValid").Return(tests[i].isValidError)
|
||||
}
|
||||
|
||||
// Setup reconciler
|
||||
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
|
||||
r := BackupStorageLocationReconciler{
|
||||
Ctx: ctx,
|
||||
Client: fake.NewFakeClientWithScheme(scheme.Scheme, locations),
|
||||
DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{
|
||||
StorageLocation: "default",
|
||||
ServerValidationFrequency: 0,
|
||||
},
|
||||
NewPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
|
||||
BackupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores),
|
||||
Log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
actualResult, err := r.Reconcile(ctx, ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{Namespace: "ns-1"},
|
||||
})
|
||||
|
||||
Expect(actualResult).To(BeEquivalentTo(ctrl.Result{Requeue: true}))
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
// Assertions
|
||||
for i, location := range locations.Items {
|
||||
key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace}
|
||||
instance := &velerov1api.BackupStorageLocation{}
|
||||
err := r.Client.Get(ctx, key, instance)
|
||||
Expect(err).To(BeNil())
|
||||
Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault))
|
||||
Expect(instance.Status.Phase).To(BeIdenticalTo(tests[i].expectedPhase))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
|
|
@ -34,12 +34,13 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
)
|
||||
|
||||
func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration) *PeriodicalEnqueueSource {
|
||||
func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration, filters ...func(object client.Object) bool) *PeriodicalEnqueueSource {
|
||||
return &PeriodicalEnqueueSource{
|
||||
logger: logger.WithField("resource", reflect.TypeOf(objList).String()),
|
||||
Client: client,
|
||||
objList: objList,
|
||||
period: period,
|
||||
logger: logger.WithField("resource", reflect.TypeOf(objList).String()),
|
||||
Client: client,
|
||||
objList: objList,
|
||||
period: period,
|
||||
filterFuncs: filters,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,9 +49,10 @@ func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client,
|
|||
// the reconcile logic periodically
|
||||
type PeriodicalEnqueueSource struct {
|
||||
client.Client
|
||||
logger logrus.FieldLogger
|
||||
objList client.ObjectList
|
||||
period time.Duration
|
||||
logger logrus.FieldLogger
|
||||
objList client.ObjectList
|
||||
period time.Duration
|
||||
filterFuncs []func(object client.Object) bool
|
||||
}
|
||||
|
||||
func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, pre ...predicate.Predicate) error {
|
||||
|
@ -70,6 +72,14 @@ func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHand
|
|||
p.logger.Error("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String())
|
||||
return nil
|
||||
}
|
||||
for _, filter := range p.filterFuncs {
|
||||
if filter != nil {
|
||||
if enqueueObj := filter(object.(client.Object)); !enqueueObj {
|
||||
p.logger.Debugf("skip enqueue object %s/%s due to filter function.", obj.GetNamespace(), obj.GetName())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
q.Add(ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: obj.GetNamespace(),
|
||||
|
|
|
@ -26,8 +26,10 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
crclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/storage"
|
||||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
|
@ -43,7 +45,7 @@ func TestStart(t *testing.T) {
|
|||
|
||||
// no resources
|
||||
time.Sleep(1 * time.Second)
|
||||
require.Equal(t, queue.Len(), 0)
|
||||
require.Equal(t, 0, queue.Len())
|
||||
|
||||
// contain one resource
|
||||
require.Nil(t, client.Create(ctx, &velerov1.Schedule{
|
||||
|
@ -52,13 +54,53 @@ func TestStart(t *testing.T) {
|
|||
},
|
||||
}))
|
||||
time.Sleep(2 * time.Second)
|
||||
require.Equal(t, queue.Len(), 1)
|
||||
require.Equal(t, 1, queue.Len())
|
||||
|
||||
// context canceled, the enqueue source shouldn't run anymore
|
||||
item, _ := queue.Get()
|
||||
queue.Forget(item)
|
||||
require.Equal(t, queue.Len(), 0)
|
||||
require.Equal(t, 0, queue.Len())
|
||||
cancelFunc()
|
||||
time.Sleep(2 * time.Second)
|
||||
require.Equal(t, queue.Len(), 0)
|
||||
require.Equal(t, 0, queue.Len())
|
||||
}
|
||||
|
||||
func TestFilter(t *testing.T) {
|
||||
require.Nil(t, velerov1.AddToScheme(scheme.Scheme))
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.TODO())
|
||||
client := (&fake.ClientBuilder{}).Build()
|
||||
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
|
||||
source := NewPeriodicalEnqueueSource(
|
||||
logrus.WithContext(ctx),
|
||||
client,
|
||||
&velerov1.BackupStorageLocationList{},
|
||||
1*time.Second,
|
||||
func(object crclient.Object) bool {
|
||||
location := object.(*velerov1.BackupStorageLocation)
|
||||
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name))
|
||||
},
|
||||
)
|
||||
|
||||
require.Nil(t, source.Start(ctx, nil, queue))
|
||||
|
||||
// Should not patch a backup storage location object status phase
|
||||
// if the location's validation frequency is specifically set to zero
|
||||
require.Nil(t, client.Create(ctx, &velerov1.BackupStorageLocation{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "location1",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: velerov1.BackupStorageLocationSpec{
|
||||
ValidationFrequency: &metav1.Duration{Duration: 0},
|
||||
},
|
||||
Status: velerov1.BackupStorageLocationStatus{
|
||||
LastValidationTime: &metav1.Time{Time: time.Now()},
|
||||
},
|
||||
}))
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
require.Equal(t, 0, queue.Len())
|
||||
|
||||
cancelFunc()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue