Merge branch 'main' into vgdp-ms-pvb-data-path

pull/8998/head
Lyndon-Li 2025-06-03 13:26:52 +08:00
commit 5ccf22e0b0
14 changed files with 142 additions and 57 deletions

View File

@ -0,0 +1 @@
Add support for configuring VGS label key

View File

@ -0,0 +1 @@
Mark BackupRepository not ready when BSL changed

View File

@ -507,6 +507,10 @@ spec:
uploads to perform when using the uploader.
type: integer
type: object
volumeGroupSnapshotLabelKey:
description: VolumeGroupSnapshotLabelKey specifies the label key to
group PVCs under a VGS.
type: string
volumeSnapshotLocations:
description: VolumeSnapshotLocations is a list containing names of
VolumeSnapshotLocations associated with this backup.

View File

@ -549,6 +549,10 @@ spec:
uploads to perform when using the uploader.
type: integer
type: object
volumeGroupSnapshotLabelKey:
description: VolumeGroupSnapshotLabelKey specifies the label key
to group PVCs under a VGS.
type: string
volumeSnapshotLocations:
description: VolumeSnapshotLocations is a list containing names
of VolumeSnapshotLocations associated with this backup.

File diff suppressed because one or more lines are too long

View File

@ -113,6 +113,10 @@ type BackupSpec struct {
// +optional
TTL metav1.Duration `json:"ttl,omitempty"`
// VolumeGroupSnapshotLabelKey specifies the label key to group PVCs under a VGS.
// +optional
VolumeGroupSnapshotLabelKey string `json:"volumeGroupSnapshotLabelKey,omitempty"`
// IncludeClusterResources specifies whether cluster-scoped resources
// should be included for consideration in the backup.
// +optional

View File

@ -240,6 +240,12 @@ func (b *BackupBuilder) TTL(ttl time.Duration) *BackupBuilder {
return b
}
// VolumeGroupSnapshotLabelKey sets the label key to group PVCs for VolumeGroupSnapshot.
func (b *BackupBuilder) VolumeGroupSnapshotLabelKey(labelKey string) *BackupBuilder {
b.object.Spec.VolumeGroupSnapshotLabelKey = labelKey
return b
}
// Expiration sets the Backup's expiration.
func (b *BackupBuilder) Expiration(val time.Time) *BackupBuilder {
b.object.Status.Expiration = &metav1.Time{Time: val}

View File

@ -37,6 +37,9 @@ const (
// the default TTL for a backup
defaultBackupTTL = 30 * 24 * time.Hour
// defaultVGSLabelKey is the default label key used to group PVCs under a VolumeGroupSnapshot
defaultVGSLabelKey = "velero.io/volume-group-snapshot"
defaultCSISnapshotTimeout = 10 * time.Minute
defaultItemOperationTimeout = 4 * time.Hour
@ -150,6 +153,7 @@ type Config struct {
PodVolumeOperationTimeout time.Duration
ResourceTerminatingTimeout time.Duration
DefaultBackupTTL time.Duration
DefaultVGSLabelKey string
StoreValidationFrequency time.Duration
DefaultCSISnapshotTimeout time.Duration
DefaultItemOperationTimeout time.Duration
@ -189,6 +193,7 @@ func GetDefaultConfig() *Config {
DefaultVolumeSnapshotLocations: flag.NewMap().WithKeyValueDelimiter(':'),
BackupSyncPeriod: defaultBackupSyncPeriod,
DefaultBackupTTL: defaultBackupTTL,
DefaultVGSLabelKey: defaultVGSLabelKey,
DefaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
DefaultItemOperationTimeout: defaultItemOperationTimeout,
ResourceTimeout: resourceTimeout,
@ -240,6 +245,7 @@ func (c *Config) BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&c.ProfilerAddress, "profiler-address", c.ProfilerAddress, "The address to expose the pprof profiler.")
flags.DurationVar(&c.ResourceTerminatingTimeout, "terminating-resource-timeout", c.ResourceTerminatingTimeout, "How long to wait on persistent volumes and namespaces to terminate during a restore before timing out.")
flags.DurationVar(&c.DefaultBackupTTL, "default-backup-ttl", c.DefaultBackupTTL, "How long to wait by default before backups can be garbage collected.")
flags.StringVar(&c.DefaultVGSLabelKey, "volume-group-snapshot-label-key", c.DefaultVGSLabelKey, "Label key for grouping PVCs into VolumeGroupSnapshot. Default value is 'velero.io/volume-group-snapshot'")
flags.DurationVar(&c.RepoMaintenanceFrequency, "default-repo-maintain-frequency", c.RepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default.")
flags.DurationVar(&c.GarbageCollectionFrequency, "garbage-collection-frequency", c.GarbageCollectionFrequency, "How often garbage collection is run for expired backups.")
flags.DurationVar(&c.ItemOperationSyncFrequency, "item-operation-sync-frequency", c.ItemOperationSyncFrequency, "How often to check status on backup/restore operations after backup/restore processing. Default is 10 seconds")

View File

@ -634,6 +634,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.config.DefaultBackupLocation,
s.config.DefaultVolumesToFsBackup,
s.config.DefaultBackupTTL,
s.config.DefaultVGSLabelKey,
s.config.DefaultCSISnapshotTimeout,
s.config.ResourceTimeout,
s.config.DefaultItemOperationTimeout,

View File

@ -75,6 +75,7 @@ type backupReconciler struct {
defaultBackupLocation string
defaultVolumesToFsBackup bool
defaultBackupTTL time.Duration
defaultVGSLabelKey string
defaultCSISnapshotTimeout time.Duration
resourceTimeout time.Duration
defaultItemOperationTimeout time.Duration
@ -102,6 +103,7 @@ func NewBackupReconciler(
defaultBackupLocation string,
defaultVolumesToFsBackup bool,
defaultBackupTTL time.Duration,
defaultVGSLabelKey string,
defaultCSISnapshotTimeout time.Duration,
resourceTimeout time.Duration,
defaultItemOperationTimeout time.Duration,
@ -128,6 +130,7 @@ func NewBackupReconciler(
defaultBackupLocation: defaultBackupLocation,
defaultVolumesToFsBackup: defaultVolumesToFsBackup,
defaultBackupTTL: defaultBackupTTL,
defaultVGSLabelKey: defaultVGSLabelKey,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
resourceTimeout: resourceTimeout,
defaultItemOperationTimeout: defaultItemOperationTimeout,
@ -347,6 +350,10 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
request.Spec.TTL.Duration = b.defaultBackupTTL
}
if len(request.Spec.VolumeGroupSnapshotLabelKey) == 0 {
request.Spec.VolumeGroupSnapshotLabelKey = b.defaultVGSLabelKey
}
if request.Spec.CSISnapshotTimeout.Duration == 0 {
// set default CSI VolumeSnapshot timeout
request.Spec.CSISnapshotTimeout.Duration = b.defaultCSISnapshotTimeout

View File

@ -479,6 +479,69 @@ func TestDefaultBackupTTL(t *testing.T) {
}
}
func TestPrepareBackupRequest_SetsVGSLabelKey(t *testing.T) {
now, err := time.Parse(time.RFC1123Z, time.RFC1123Z)
require.NoError(t, err)
now = now.Local()
defaultVGSLabelKey := "velero.io/volume-group-snapshot"
tests := []struct {
name string
backup *velerov1api.Backup
serverFlagKey string
expectedLabelKey string
}{
{
name: "backup with spec label key set",
backup: builder.ForBackup("velero", "backup-1").
VolumeGroupSnapshotLabelKey("spec-key").
Result(),
serverFlagKey: "server-key",
expectedLabelKey: "spec-key",
},
{
name: "backup with no spec key, uses server flag",
backup: builder.ForBackup("velero", "backup-2").Result(),
serverFlagKey: "server-key",
expectedLabelKey: "server-key",
},
{
name: "backup with no spec or server flag, uses default",
backup: builder.ForBackup("velero", "backup-3").Result(),
serverFlagKey: defaultVGSLabelKey,
expectedLabelKey: defaultVGSLabelKey,
},
}
for _, test := range tests {
formatFlag := logging.FormatText
logger := logging.DefaultLogger(logrus.DebugLevel, formatFlag)
t.Run(test.name, func(t *testing.T) {
fakeClient := velerotest.NewFakeControllerRuntimeClient(t, test.backup)
apiServer := velerotest.NewAPIServer(t)
discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, logger)
require.NoError(t, err)
c := &backupReconciler{
logger: logger,
kbClient: fakeClient,
defaultVGSLabelKey: test.serverFlagKey,
discoveryHelper: discoveryHelper,
clock: testclocks.NewFakeClock(now),
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
}
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger)
assert.NotNil(t, res)
assert.Equal(t, test.expectedLabelKey, res.Spec.VolumeGroupSnapshotLabelKey)
})
}
}
func TestDefaultVolumesToResticDeprecation(t *testing.T) {
tests := []struct {
name string

View File

@ -31,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
@ -104,14 +105,11 @@ func (r *BackupRepoReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&velerov1api.BackupRepository{}, builder.WithPredicates(kube.SpecChangePredicate{})).
WatchesRawSource(s).
Watches(
// mark BackupRepository as invalid when BSL is created, updated or deleted.
// BSL may be recreated after deleting, so also include the create event
&velerov1api.BackupStorageLocation{},
kube.EnqueueRequestsFromMapUpdateFunc(r.invalidateBackupReposForBSL),
builder.WithPredicates(
// When BSL updates, check if the backup repositories need to be invalidated
kube.NewUpdateEventPredicate(r.needInvalidBackupRepo),
// When BSL is created, invalidate any backup repositories that reference it
kube.NewCreateEventPredicate(func(client.Object) bool { return true }),
),
builder.WithPredicates(kube.NewUpdateEventPredicate(r.needInvalidBackupRepo)),
).
Complete(r)
}
@ -130,14 +128,17 @@ func (r *BackupRepoReconciler) invalidateBackupReposForBSL(ctx context.Context,
return []reconcile.Request{}
}
requests := []reconcile.Request{}
for i := range list.Items {
r.logger.WithField("BSL", bsl.Name).Infof("Invalidating Backup Repository %s", list.Items[i].Name)
if err := r.patchBackupRepository(context.Background(), &list.Items[i], repoNotReady("re-establish on BSL change or create")); err != nil {
if err := r.patchBackupRepository(context.Background(), &list.Items[i], repoNotReady("re-establish on BSL change, create or delete")); err != nil {
r.logger.WithField("BSL", bsl.Name).WithError(err).Errorf("fail to patch BackupRepository %s", list.Items[i].Name)
continue
}
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: list.Items[i].Namespace, Name: list.Items[i].Name}})
}
return []reconcile.Request{}
return requests
}
// needInvalidBackupRepo returns true if the BSL's storage type, bucket, prefix, CACert, or config has changed

View File

@ -47,15 +47,6 @@ func (SpecChangePredicate) Update(e event.UpdateEvent) bool {
return !reflect.DeepEqual(oldSpec.Interface(), newSpec.Interface())
}
// NewGenericEventPredicate creates a new Predicate that checks the Generic event with the provided func
func NewGenericEventPredicate(f func(object client.Object) bool) predicate.Predicate {
return predicate.Funcs{
GenericFunc: func(event event.GenericEvent) bool {
return f(event.Object)
},
}
}
// NewAllEventPredicate creates a new Predicate that checks all the events with the provided func
func NewAllEventPredicate(f func(object client.Object) bool) predicate.Predicate {
return predicate.Funcs{
@ -74,25 +65,6 @@ func NewAllEventPredicate(f func(object client.Object) bool) predicate.Predicate
}
}
// NewUpdateEventPredicate creates a new Predicate that checks the update events with the provided func
// and ignore others
func NewUpdateEventPredicate(f func(client.Object, client.Object) bool) predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(event event.UpdateEvent) bool {
return f(event.ObjectOld, event.ObjectNew)
},
CreateFunc: func(event event.CreateEvent) bool {
return false
},
DeleteFunc: func(event event.DeleteEvent) bool {
return false
},
GenericFunc: func(event event.GenericEvent) bool {
return false
},
}
}
// FalsePredicate always returns false for all kinds of events
type FalsePredicate struct{}
@ -116,19 +88,20 @@ func (f FalsePredicate) Generic(event.GenericEvent) bool {
return false
}
func NewCreateEventPredicate(f func(client.Object) bool) predicate.Predicate {
// NewGenericEventPredicate creates a new Predicate that checks the Generic event with the provided func
func NewGenericEventPredicate(f func(object client.Object) bool) predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
return f(event.Object)
},
DeleteFunc: func(event event.DeleteEvent) bool {
return false
},
GenericFunc: func(event event.GenericEvent) bool {
return false
},
UpdateFunc: func(event event.UpdateEvent) bool {
return false
return f(event.Object)
},
}
}
// NewUpdateEventPredicate creates a new Predicate that checks the Update event with the provided func
func NewUpdateEventPredicate(f func(objectOld client.Object, objectNew client.Object) bool) predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(event event.UpdateEvent) bool {
return f(event.ObjectOld, event.ObjectNew)
},
}
}

View File

@ -179,14 +179,6 @@ func TestSpecChangePredicate(t *testing.T) {
}
}
func TestNewGenericEventPredicate(t *testing.T) {
predicate := NewGenericEventPredicate(func(object client.Object) bool {
return false
})
assert.False(t, predicate.Generic(event.GenericEvent{}))
}
func TestNewAllEventPredicate(t *testing.T) {
predicate := NewAllEventPredicate(func(object client.Object) bool {
return false
@ -197,3 +189,25 @@ func TestNewAllEventPredicate(t *testing.T) {
assert.False(t, predicate.Delete(event.DeleteEvent{}))
assert.False(t, predicate.Generic(event.GenericEvent{}))
}
func TestNewGenericEventPredicate(t *testing.T) {
predicate := NewGenericEventPredicate(func(object client.Object) bool {
return false
})
assert.False(t, predicate.Generic(event.GenericEvent{}))
assert.True(t, predicate.Update(event.UpdateEvent{}))
assert.True(t, predicate.Create(event.CreateEvent{}))
assert.True(t, predicate.Delete(event.DeleteEvent{}))
}
func TestNewUpdateEventPredicate(t *testing.T) {
predicate := NewUpdateEventPredicate(func(client.Object, client.Object) bool {
return false
})
assert.False(t, predicate.Update(event.UpdateEvent{}))
assert.True(t, predicate.Create(event.CreateEvent{}))
assert.True(t, predicate.Delete(event.DeleteEvent{}))
assert.True(t, predicate.Generic(event.GenericEvent{}))
}