Merge pull request #9015 from Lyndon-Li/vgdp-ms-pvb-controller

VGDP MS PVB controller
pull/9021/head
Wenkai Yin(尹文开) 2025-06-16 13:10:44 +08:00 committed by GitHub
commit 7d8a36a6e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1630 additions and 607 deletions

View File

@ -0,0 +1 @@
Fix issue #8958, add VGDP MS PVB controller

View File

@ -19,34 +19,37 @@ spec:
jsonPath: .status.phase jsonPath: .status.phase
name: Status name: Status
type: string type: string
- description: Time when this backup was started - description: Time duration since this PodVolumeBackup was started
jsonPath: .status.startTimestamp jsonPath: .status.startTimestamp
name: Created name: Started
type: date type: date
- description: Namespace of the pod containing the volume to be backed up - description: Completed bytes
jsonPath: .spec.pod.namespace format: int64
name: Namespace jsonPath: .status.progress.bytesDone
type: string name: Bytes Done
- description: Name of the pod containing the volume to be backed up type: integer
jsonPath: .spec.pod.name - description: Total bytes
name: Pod format: int64
type: string jsonPath: .status.progress.totalBytes
- description: Name of the volume to be backed up name: Total Bytes
jsonPath: .spec.volume type: integer
name: Volume
type: string
- description: The type of the uploader to handle data transfer
jsonPath: .spec.uploaderType
name: Uploader Type
type: string
- description: Name of the Backup Storage Location where this backup should be - description: Name of the Backup Storage Location where this backup should be
stored stored
jsonPath: .spec.backupStorageLocation jsonPath: .spec.backupStorageLocation
name: Storage Location name: Storage Location
type: string type: string
- jsonPath: .metadata.creationTimestamp - description: Time duration since this PodVolumeBackup was created
jsonPath: .metadata.creationTimestamp
name: Age name: Age
type: date type: date
- description: Name of the node where the PodVolumeBackup is processed
jsonPath: .status.node
name: Node
type: string
- description: The type of the uploader to handle data transfer
jsonPath: .spec.uploaderType
name: Uploader
type: string
name: v1 name: v1
schema: schema:
openAPIV3Schema: openAPIV3Schema:
@ -170,6 +173,13 @@ spec:
status: status:
description: PodVolumeBackupStatus is the current status of a PodVolumeBackup. description: PodVolumeBackupStatus is the current status of a PodVolumeBackup.
properties: properties:
acceptedTimestamp:
description: |-
AcceptedTimestamp records the time the pod volume backup is to be prepared.
The server's time is used for AcceptedTimestamp
format: date-time
nullable: true
type: string
completionTimestamp: completionTimestamp:
description: |- description: |-
CompletionTimestamp records the time a backup was completed. CompletionTimestamp records the time a backup was completed.
@ -190,7 +200,11 @@ spec:
description: Phase is the current state of the PodVolumeBackup. description: Phase is the current state of the PodVolumeBackup.
enum: enum:
- New - New
- Accepted
- Prepared
- InProgress - InProgress
- Canceling
- Canceled
- Completed - Completed
- Failed - Failed
type: string type: string

File diff suppressed because one or more lines are too long

View File

@ -105,6 +105,9 @@ const (
// defaultVGSLabelKey is the default label key used to group PVCs under a VolumeGroupSnapshot // defaultVGSLabelKey is the default label key used to group PVCs under a VolumeGroupSnapshot
DefaultVGSLabelKey = "velero.io/volume-group" DefaultVGSLabelKey = "velero.io/volume-group"
// PVBLabel is the label key used to identify the pvb for pvb pod
PVBLabel = "velero.io/pod-volume-backup"
// PVRLabel is the label key used to identify the pvb for pvr pod // PVRLabel is the label key used to identify the pvb for pvr pod
PVRLabel = "velero.io/pod-volume-restore" PVRLabel = "velero.io/pod-volume-restore"
) )

View File

@ -64,12 +64,16 @@ type PodVolumeBackupSpec struct {
} }
// PodVolumeBackupPhase represents the lifecycle phase of a PodVolumeBackup. // PodVolumeBackupPhase represents the lifecycle phase of a PodVolumeBackup.
// +kubebuilder:validation:Enum=New;InProgress;Completed;Failed // +kubebuilder:validation:Enum=New;Accepted;Prepared;InProgress;Canceling;Canceled;Completed;Failed
type PodVolumeBackupPhase string type PodVolumeBackupPhase string
const ( const (
PodVolumeBackupPhaseNew PodVolumeBackupPhase = "New" PodVolumeBackupPhaseNew PodVolumeBackupPhase = "New"
PodVolumeBackupPhaseAccepted PodVolumeBackupPhase = "Accepted"
PodVolumeBackupPhasePrepared PodVolumeBackupPhase = "Prepared"
PodVolumeBackupPhaseInProgress PodVolumeBackupPhase = "InProgress" PodVolumeBackupPhaseInProgress PodVolumeBackupPhase = "InProgress"
PodVolumeBackupPhaseCanceling PodVolumeBackupPhase = "Canceling"
PodVolumeBackupPhaseCanceled PodVolumeBackupPhase = "Canceled"
PodVolumeBackupPhaseCompleted PodVolumeBackupPhase = "Completed" PodVolumeBackupPhaseCompleted PodVolumeBackupPhase = "Completed"
PodVolumeBackupPhaseFailed PodVolumeBackupPhase = "Failed" PodVolumeBackupPhaseFailed PodVolumeBackupPhase = "Failed"
) )
@ -113,20 +117,27 @@ type PodVolumeBackupStatus struct {
// about the backup operation. // about the backup operation.
// +optional // +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"` Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// AcceptedTimestamp records the time the pod volume backup is to be prepared.
// The server's time is used for AcceptedTimestamp
// +optional
// +nullable
AcceptedTimestamp *metav1.Time `json:"acceptedTimestamp,omitempty"`
} }
// TODO(2.0) After converting all resources to use the runttime-controller client, // TODO(2.0) After converting all resources to use the runttime-controller client,
// the genclient and k8s:deepcopy markers will no longer be needed and should be removed. // the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
// +genclient // +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:storageversion
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="PodVolumeBackup status such as New/InProgress" // +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="PodVolumeBackup status such as New/InProgress"
// +kubebuilder:printcolumn:name="Created",type="date",JSONPath=".status.startTimestamp",description="Time when this backup was started" // +kubebuilder:printcolumn:name="Started",type="date",JSONPath=".status.startTimestamp",description="Time duration since this PodVolumeBackup was started"
// +kubebuilder:printcolumn:name="Namespace",type="string",JSONPath=".spec.pod.namespace",description="Namespace of the pod containing the volume to be backed up" // +kubebuilder:printcolumn:name="Bytes Done",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Completed bytes"
// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".spec.pod.name",description="Name of the pod containing the volume to be backed up" // +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Volume",type="string",JSONPath=".spec.volume",description="Name of the volume to be backed up"
// +kubebuilder:printcolumn:name="Uploader Type",type="string",JSONPath=".spec.uploaderType",description="The type of the uploader to handle data transfer"
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where this backup should be stored" // +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where this backup should be stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this PodVolumeBackup was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the PodVolumeBackup is processed"
// +kubebuilder:printcolumn:name="Uploader",type="string",JSONPath=".spec.uploaderType",description="The type of the uploader to handle data transfer"
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
// +kubebuilder:object:generate=true // +kubebuilder:object:generate=true

View File

@ -1043,6 +1043,10 @@ func (in *PodVolumeBackupStatus) DeepCopyInto(out *PodVolumeBackupStatus) {
*out = (*in).DeepCopy() *out = (*in).DeepCopy()
} }
out.Progress = in.Progress out.Progress = in.Progress
if in.AcceptedTimestamp != nil {
in, out := &in.AcceptedTimestamp, &out.AcceptedTimestamp
*out = (*in).DeepCopy()
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeBackupStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeBackupStatus.

View File

@ -119,3 +119,33 @@ func (b *PodVolumeBackupBuilder) Annotations(annotations map[string]string) *Pod
b.object.Annotations = annotations b.object.Annotations = annotations
return b return b
} }
// Cancel sets the PodVolumeBackup's Cancel.
func (b *PodVolumeBackupBuilder) Cancel(cancel bool) *PodVolumeBackupBuilder {
b.object.Spec.Cancel = cancel
return b
}
// AcceptedTimestamp sets the PodVolumeBackup's AcceptedTimestamp.
func (b *PodVolumeBackupBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) *PodVolumeBackupBuilder {
b.object.Status.AcceptedTimestamp = acceptedTimestamp
return b
}
// Finalizers sets the PodVolumeBackup's Finalizers.
func (b *PodVolumeBackupBuilder) Finalizers(finalizers []string) *PodVolumeBackupBuilder {
b.object.Finalizers = finalizers
return b
}
// Message sets the PodVolumeBackup's Message.
func (b *PodVolumeBackupBuilder) Message(msg string) *PodVolumeBackupBuilder {
b.object.Status.Message = msg
return b
}
// OwnerReference sets the PodVolumeBackup's OwnerReference.
func (b *PodVolumeBackupBuilder) OwnerReference(ref metav1.OwnerReference) *PodVolumeBackupBuilder {
b.object.OwnerReferences = append(b.object.OwnerReferences, ref)
return b
}

View File

@ -48,7 +48,6 @@ import (
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned" snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo" "github.com/vmware-tanzu/velero/pkg/buildinfo"
@ -60,7 +59,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging" "github.com/vmware-tanzu/velero/pkg/util/logging"
@ -282,30 +280,6 @@ func (s *nodeAgentServer) run() {
s.logger.Info("Starting controllers") s.logger.Info("Starting controllers")
credentialFileStore, err := credentials.NewNamespacedFileStore(
s.mgr.GetClient(),
s.namespace,
credentials.DefaultStoreDirectory(),
filesystem.NewFileSystem(),
)
if err != nil {
s.logger.Fatalf("Failed to create credentials file store: %v", err)
}
credSecretStore, err := credentials.NewNamespacedSecretStore(s.mgr.GetClient(), s.namespace)
if err != nil {
s.logger.Fatalf("Failed to create secret file store: %v", err)
}
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer,
credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger)
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
}
var loadAffinity *kube.LoadAffinity var loadAffinity *kube.LoadAffinity
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0] loadAffinity = s.dataPathConfigs.LoadAffinity[0]
@ -328,7 +302,12 @@ func (s *nodeAgentServer) run() {
} }
} }
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger).SetupWithManager(s.mgr); err != nil { pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger)
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
}
if err := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
} }
@ -347,7 +326,7 @@ func (s *nodeAgentServer) run() {
s.logger, s.logger,
s.metrics, s.metrics,
) )
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { if err := dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller") s.logger.WithError(err).Fatal("Unable to create the data upload controller")
} }
@ -358,7 +337,7 @@ func (s *nodeAgentServer) run() {
} }
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, restorePVCConfig, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, restorePVCConfig, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { if err := dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller") s.logger.WithError(err).Fatal("Unable to create the data download controller")
} }

View File

@ -312,30 +312,29 @@ func (f *fakeSnapshotExposer) DiagnoseExpose(context.Context, corev1api.ObjectRe
func (f *fakeSnapshotExposer) CleanUp(context.Context, corev1api.ObjectReference, string, string) { func (f *fakeSnapshotExposer) CleanUp(context.Context, corev1api.ObjectReference, string, string) {
} }
type fakeDataUploadFSBR struct { type fakeFSBR struct {
du *velerov2alpha1api.DataUpload
kubeClient kbclient.Client kubeClient kbclient.Client
clock clock.WithTickerAndDelayedExecution clock clock.WithTickerAndDelayedExecution
initErr error initErr error
startErr error startErr error
} }
func (f *fakeDataUploadFSBR) Init(ctx context.Context, param any) error { func (f *fakeFSBR) Init(ctx context.Context, param any) error {
return f.initErr return f.initErr
} }
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param any) error { func (f *fakeFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param any) error {
return f.startErr return f.startErr
} }
func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error { func (f *fakeFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error {
return nil return nil
} }
func (b *fakeDataUploadFSBR) Cancel() { func (b *fakeFSBR) Cancel() {
} }
func (b *fakeDataUploadFSBR) Close(ctx context.Context) { func (b *fakeFSBR) Close(ctx context.Context) {
} }
func TestReconcile(t *testing.T) { func TestReconcile(t *testing.T) {
@ -651,8 +650,7 @@ func TestReconcile(t *testing.T) {
} }
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
return &fakeDataUploadFSBR{ return &fakeFSBR{
du: test.du,
kubeClient: r.client, kubeClient: r.client,
clock: r.Clock, clock: r.Clock,
initErr: test.fsBRInitErr, initErr: test.fsBRInitErr,

View File

@ -27,23 +27,29 @@ import (
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
clocks "k8s.io/utils/clock" clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/vmware-tanzu/velero/internal/credentials"
veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/podvolume" "github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util"
"github.com/vmware-tanzu/velero/pkg/util/kube"
) )
const ( const (
@ -52,36 +58,41 @@ const (
) )
// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance
func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, func NewPodVolumeBackupReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements,
metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler {
return &PodVolumeBackupReconciler{ return &PodVolumeBackupReconciler{
Client: client, client: client,
mgr: mgr,
kubeClient: kubeClient, kubeClient: kubeClient,
logger: logger.WithField("controller", "PodVolumeBackup"), logger: logger.WithField("controller", "PodVolumeBackup"),
repositoryEnsurer: ensurer,
credentialGetter: credentialGetter,
nodeName: nodeName, nodeName: nodeName,
fileSystem: filesystem.NewFileSystem(),
clock: &clocks.RealClock{}, clock: &clocks.RealClock{},
scheme: scheme,
metrics: metrics, metrics: metrics,
podResources: podResources,
dataPathMgr: dataPathMgr, dataPathMgr: dataPathMgr,
preparingTimeout: preparingTimeout,
resourceTimeout: resourceTimeout,
exposer: exposer.NewPodVolumeExposer(kubeClient, logger),
cancelledPVB: make(map[string]time.Time),
} }
} }
// PodVolumeBackupReconciler reconciles a PodVolumeBackup object // PodVolumeBackupReconciler reconciles a PodVolumeBackup object
type PodVolumeBackupReconciler struct { type PodVolumeBackupReconciler struct {
client.Client client client.Client
mgr manager.Manager
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
scheme *runtime.Scheme
clock clocks.WithTickerAndDelayedExecution clock clocks.WithTickerAndDelayedExecution
exposer exposer.PodVolumeExposer
metrics *metrics.ServerMetrics metrics *metrics.ServerMetrics
credentialGetter *credentials.CredentialGetter
repositoryEnsurer *repository.Ensurer
nodeName string nodeName string
fileSystem filesystem.Interface
logger logrus.FieldLogger logger logrus.FieldLogger
podResources corev1api.ResourceRequirements
dataPathMgr *datapath.Manager dataPathMgr *datapath.Manager
preparingTimeout time.Duration
resourceTimeout time.Duration
cancelledPVB map[string]time.Time
} }
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete
@ -93,13 +104,13 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
"podvolumebackup": req.NamespacedName, "podvolumebackup": req.NamespacedName,
}) })
var pvb velerov1api.PodVolumeBackup var pvb = &velerov1api.PodVolumeBackup{}
if err := r.Client.Get(ctx, req.NamespacedName, &pvb); err != nil { if err := r.client.Get(ctx, req.NamespacedName, pvb); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
log.Debug("Unable to find PodVolumeBackup") log.Warn("Unable to find PVB, skip")
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
return ctrl.Result{}, errors.Wrap(err, "getting PodVolumeBackup") return ctrl.Result{}, errors.Wrap(err, "getting PVB")
} }
if len(pvb.OwnerReferences) == 1 { if len(pvb.OwnerReferences) == 1 {
log = log.WithField( log = log.WithField(
@ -108,20 +119,153 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
) )
} }
if !isPVBInFinalState(pvb) {
if !controllerutil.ContainsFinalizer(pvb, PodVolumeFinalizer) {
if err := UpdatePVBWithRetry(ctx, r.client, req.NamespacedName, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if controllerutil.ContainsFinalizer(pvb, PodVolumeFinalizer) {
return false
}
controllerutil.AddFinalizer(pvb, PodVolumeFinalizer)
return true
}); err != nil {
log.WithError(err).Errorf("Failed to add finalizer for PVB %s/%s", pvb.Namespace, pvb.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if !pvb.DeletionTimestamp.IsZero() {
if !pvb.Spec.Cancel {
log.Warnf("Cancel PVB under phase %s because it is being deleted", pvb.Status.Phase)
if err := UpdatePVBWithRetry(ctx, r.client, req.NamespacedName, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if pvb.Spec.Cancel {
return false
}
pvb.Spec.Cancel = true
pvb.Status.Message = "Cancel PVB because it is being deleted"
return true
}); err != nil {
log.WithError(err).Errorf("Failed to set cancel flag for PVB %s/%s", pvb.Namespace, pvb.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
} else {
delete(r.cancelledPVB, pvb.Name)
if controllerutil.ContainsFinalizer(pvb, PodVolumeFinalizer) {
if err := UpdatePVBWithRetry(ctx, r.client, req.NamespacedName, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if !controllerutil.ContainsFinalizer(pvb, PodVolumeFinalizer) {
return false
}
controllerutil.RemoveFinalizer(pvb, PodVolumeFinalizer)
return true
}); err != nil {
log.WithError(err).Error("error to remove finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
if pvb.Spec.Cancel {
if spotted, found := r.cancelledPVB[pvb.Name]; !found {
r.cancelledPVB[pvb.Name] = r.clock.Now()
} else {
delay := cancelDelayOthers
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress {
delay = cancelDelayInProgress
}
if time.Since(spotted) > delay {
log.Infof("PVB %s is canceled in Phase %s but not handled in reasonable time", pvb.GetName(), pvb.Status.Phase)
if r.tryCancelPodVolumeBackup(ctx, pvb, "") {
delete(r.cancelledPVB, pvb.Name)
}
return ctrl.Result{}, nil
}
}
}
if pvb.Status.Phase == "" || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseNew {
if pvb.Spec.Cancel {
log.Infof("PVB %s is canceled in Phase %s", pvb.GetName(), pvb.Status.Phase)
r.tryCancelPodVolumeBackup(ctx, pvb, "")
return ctrl.Result{}, nil
}
// Only process items for this node. // Only process items for this node.
if pvb.Spec.Node != r.nodeName { if pvb.Spec.Node != r.nodeName {
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
switch pvb.Status.Phase { log.Info("Accepting PVB")
case "", velerov1api.PodVolumeBackupPhaseNew:
// Only process new items. if err := r.acceptPodVolumeBackup(ctx, pvb); err != nil {
default: return ctrl.Result{}, errors.Wrapf(err, "error accepting PVB %s", pvb.Name)
log.Debug("PodVolumeBackup is not new, not processing") }
log.Info("Exposing PVB")
exposeParam := r.setupExposeParam(pvb)
if err := r.exposer.Expose(ctx, getPVBOwnerObject(pvb), exposeParam); err != nil {
return r.errorOut(ctx, pvb, err, "error to expose PVB", log)
}
log.Info("PVB is exposed")
return ctrl.Result{}, nil
} else if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted {
if peekErr := r.exposer.PeekExposed(ctx, getPVBOwnerObject(pvb)); peekErr != nil {
log.Errorf("Cancel PVB %s/%s because of expose error %s", pvb.Namespace, pvb.Name, peekErr)
r.tryCancelPodVolumeBackup(ctx, pvb, fmt.Sprintf("found a PVB %s/%s with expose error: %s. mark it as cancel", pvb.Namespace, pvb.Name, peekErr))
} else if pvb.Status.AcceptedTimestamp != nil {
if time.Since(pvb.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, pvb)
}
}
return ctrl.Result{}, nil
} else if pvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared {
log.Infof("PVB is prepared and should be processed by %s (%s)", pvb.Spec.Node, r.nodeName)
if pvb.Spec.Node != r.nodeName {
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
log.Info("PodVolumeBackup starting") if pvb.Spec.Cancel {
log.Info("Prepared PVB is being canceled")
r.OnDataPathCancelled(ctx, pvb.GetNamespace(), pvb.GetName())
return ctrl.Result{}, nil
}
asyncBR := r.dataPathMgr.GetAsyncBR(pvb.Name)
if asyncBR != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
res, err := r.exposer.GetExposed(ctx, getPVBOwnerObject(pvb), r.client, r.nodeName, r.resourceTimeout)
if err != nil {
return r.errorOut(ctx, pvb, err, "exposed PVB is not ready", log)
} else if res == nil {
return r.errorOut(ctx, pvb, errors.New("no expose result is available for the current node"), "exposed PVB is not ready", log)
}
log.Info("Exposed PVB is ready and creating data path routine")
callbacks := datapath.Callbacks{ callbacks := datapath.Callbacks{
OnCompleted: r.OnDataPathCompleted, OnCompleted: r.OnDataPathCompleted,
@ -130,116 +274,245 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
OnProgress: r.OnDataPathProgress, OnProgress: r.OnDataPathProgress,
} }
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, pVBRRequestor, ctx, r.Client, pvb.Namespace, callbacks, log) asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup,
pvb.Name, pvb.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvb.Name, callbacks, false, log)
if err != nil { if err != nil {
if err == datapath.ConcurrentLimitExceed { if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else { } else {
return r.errorOut(ctx, &pvb, err, "error to create data path", log) return r.errorOut(ctx, pvb, err, "error to create data path", log)
} }
} }
r.metrics.RegisterPodVolumeBackupEnqueue(r.nodeName) r.metrics.RegisterPodVolumeBackupEnqueue(r.nodeName)
// Update status to InProgress. if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil {
original := pvb.DeepCopy() log.WithError(err).Errorf("Failed to init cancelable data path for %s", pvb.Name)
r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, pvb, err, "error initializing data path", log)
}
terminated := false
if err := UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if isPVBInFinalState(pvb) {
terminated = true
return false
}
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress
pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil {
r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, &pvb, err, "error updating PodVolumeBackup status", log)
}
var pod corev1api.Pod return true
podNamespacedName := client.ObjectKey{
Namespace: pvb.Spec.Pod.Namespace,
Name: pvb.Spec.Pod.Name,
}
if err := r.Client.Get(ctx, podNamespacedName, &pod); err != nil {
r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log)
}
path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.kubeClient, r.fileSystem, log)
if err != nil {
r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, &pvb, err, "error exposing host path for pod volume", log)
}
log.WithField("path", path.ByPath).Debugf("Found host path")
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvb.Spec.BackupStorageLocation,
SourceNamespace: pvb.Spec.Pod.Namespace,
UploaderType: pvb.Spec.UploaderType,
RepositoryType: podvolume.GetPvbRepositoryType(&pvb),
RepoIdentifier: pvb.Spec.RepoIdentifier,
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil { }); err != nil {
log.WithError(err).Warnf("Failed to update PVB %s to InProgress, will data path close and retry", pvb.Name)
r.closeDataPath(ctx, pvb.Name) r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, &pvb, err, "error to initialize data path", log) return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} }
// If this is a PVC, look for the most recent completed pod volume backup for it and get if terminated {
// its snapshot ID to do new backup based on it. Without this, log.Warnf("PVB %s is terminated during transition from prepared", pvb.Name)
// if the pod using the PVC (and therefore the directory path under /host_pods/) has
// changed since the PVC's last backup, for backup, it will not be able to identify a suitable
// parent snapshot to use, and will have to do a full rescan of the contents of the PVC.
var parentSnapshotID string
if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok {
parentSnapshotID = r.getParentSnapshot(ctx, log, pvcUID, &pvb)
if parentSnapshotID == "" {
log.Info("No parent snapshot found for PVC, not based on parent snapshot for this backup")
} else {
log.WithField("parentSnapshotID", parentSnapshotID).Info("Based on parent snapshot for this backup")
}
}
if err := fsBackup.StartBackup(path, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
RealSource: "",
ParentSnapshot: parentSnapshotID,
ForceFull: false,
Tags: pvb.Spec.Tags,
}); err != nil {
r.closeDataPath(ctx, pvb.Name) r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log) return ctrl.Result{}, nil
} }
log.WithField("path", path.ByPath).Info("Async fs backup data path started") log.Info("PVB is marked as in progress")
if err := r.startCancelableDataPath(asyncBR, pvb, res, log); err != nil {
log.WithError(err).Errorf("Failed to start cancelable data path for %s", pvb.Name)
r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, pvb, err, "error starting data path", log)
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress {
if pvb.Spec.Cancel {
if pvb.Spec.Node != r.nodeName {
return ctrl.Result{}, nil
}
log.Info("In progress PVB is being canceled")
asyncBR := r.dataPathMgr.GetAsyncBR(pvb.Name)
if asyncBR == nil {
r.OnDataPathCancelled(ctx, pvb.GetNamespace(), pvb.GetName())
return ctrl.Result{}, nil
}
// Update status to Canceling
if err := UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if isPVBInFinalState(pvb) {
log.Warnf("PVB %s is terminated, abort setting it to canceling", pvb.Name)
return false
}
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCanceling
return true
}); err != nil {
log.WithError(err).Error("error updating PVB into canceling status")
return ctrl.Result{}, err
}
asyncBR.Cancel()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
func (r *PodVolumeBackupReconciler) acceptPodVolumeBackup(ctx context.Context, pvb *velerov1api.PodVolumeBackup) error {
return UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, r.logger, func(pvb *velerov1api.PodVolumeBackup) bool {
pvb.Status.AcceptedTimestamp = &metav1.Time{Time: r.clock.Now()}
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseAccepted
return true
})
}
func (r *PodVolumeBackupReconciler) tryCancelPodVolumeBackup(ctx context.Context, pvb *velerov1api.PodVolumeBackup, message string) bool {
log := r.logger.WithField("PVB", pvb.Name)
succeeded, err := funcExclusiveUpdatePodVolumeBackup(ctx, r.client, pvb, func(pvb *velerov1api.PodVolumeBackup) {
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCanceled
if pvb.Status.StartTimestamp.IsZero() {
pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
}
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
if message != "" {
pvb.Status.Message = message
}
})
if err != nil {
log.WithError(err).Error("error updating PVB status")
return false
} else if !succeeded {
log.Warn("conflict in updating PVB status and will try it again later")
return false
}
r.exposer.CleanUp(ctx, getPVBOwnerObject(pvb))
log.Warn("PVB is canceled")
return true
}
var funcExclusiveUpdatePodVolumeBackup = exclusiveUpdatePodVolumeBackup
func exclusiveUpdatePodVolumeBackup(ctx context.Context, cli client.Client, pvb *velerov1api.PodVolumeBackup, updateFunc func(*velerov1api.PodVolumeBackup)) (bool, error) {
updateFunc(pvb)
err := cli.Update(ctx, pvb)
if err == nil {
return true, nil
}
if apierrors.IsConflict(err) {
return false, nil
} else {
return false, err
}
}
func (r *PodVolumeBackupReconciler) onPrepareTimeout(ctx context.Context, pvb *velerov1api.PodVolumeBackup) {
log := r.logger.WithField("PVB", pvb.Name)
log.Info("Timeout happened for preparing PVB")
succeeded, err := funcExclusiveUpdatePodVolumeBackup(ctx, r.client, pvb, func(pvb *velerov1api.PodVolumeBackup) {
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = "timeout on preparing PVB"
})
if err != nil {
log.WithError(err).Warn("Failed to update PVB")
return
}
if !succeeded {
log.Warn("PVB has been updated by others")
return
}
diags := strings.Split(r.exposer.DiagnoseExpose(ctx, getPVBOwnerObject(pvb)), "\n")
for _, diag := range diags {
log.Warnf("[Diagnose PVB expose]%s", diag)
}
r.exposer.CleanUp(ctx, getPVBOwnerObject(pvb))
log.Info("PVB has been cleaned up")
}
func (r *PodVolumeBackupReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Init cancelable PVB")
if err := asyncBR.Init(ctx, nil); err != nil {
return errors.Wrap(err, "error initializing asyncBR")
}
log.Infof("async data path init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *PodVolumeBackupReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, pvb *velerov1api.PodVolumeBackup, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Start cancelable PVB")
if err := asyncBR.StartBackup(datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, pvb.Spec.UploaderSettings, nil); err != nil {
return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
}
log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
} }
func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) { func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) {
defer r.dataPathMgr.RemoveAsyncBR(pvbName) defer r.dataPathMgr.RemoveAsyncBR(pvbName)
log := r.logger.WithField("pvb", pvbName) log := r.logger.WithField("PVB", pvbName)
log.WithField("PVB", pvbName).Info("Async fs backup data path completed") log.WithField("PVB", pvbName).Info("Async fs backup data path completed")
var pvb velerov1api.PodVolumeBackup pvb := &velerov1api.PodVolumeBackup{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { if err := r.client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, pvb); err != nil {
log.WithError(err).Warn("Failed to get PVB on completion") log.WithError(err).Warn("Failed to get PVB on completion")
return return
} }
log.Info("Cleaning up exposed environment")
r.exposer.CleanUp(ctx, getPVBOwnerObject(pvb))
// Update status to Completed with path & snapshot ID. // Update status to Completed with path & snapshot ID.
original := pvb.DeepCopy() var completionTime metav1.Time
if err := UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log, func(pvb *velerov1api.PodVolumeBackup) bool {
completionTime = metav1.Time{Time: r.clock.Now()}
if isPVBInFinalState(pvb) {
return false
}
pvb.Status.Path = result.Backup.Source.ByPath pvb.Status.Path = result.Backup.Source.ByPath
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted
pvb.Status.SnapshotID = result.Backup.SnapshotID pvb.Status.SnapshotID = result.Backup.SnapshotID
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} pvb.Status.CompletionTimestamp = &completionTime
if result.Backup.EmptySnapshot { if result.Backup.EmptySnapshot {
pvb.Status.Message = "volume was empty so no snapshot was taken" pvb.Status.Message = "volume was empty so no snapshot was taken"
} }
if err := r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil { return true
log.WithError(err).Error("error updating PodVolumeBackup status") }); err != nil {
} log.WithError(err).Error("error updating PVB status")
} else {
latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time) latencyDuration := completionTime.Time.Sub(pvb.Status.StartTimestamp.Time)
latencySeconds := float64(latencyDuration / time.Second) latencySeconds := float64(latencyDuration / time.Second)
backupName := fmt.Sprintf("%s/%s", pvb.Namespace, pvb.OwnerReferences[0].Name) backupName := fmt.Sprintf("%s/%s", pvb.Namespace, pvb.OwnerReferences[0].Name)
generateOpName := fmt.Sprintf("%s-%s-%s-%s-backup", pvb.Name, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType) generateOpName := fmt.Sprintf("%s-%s-%s-%s-backup", pvb.Name, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType)
@ -247,18 +520,19 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam
r.metrics.RegisterPodVolumeOpLatencyGauge(r.nodeName, pvb.Name, generateOpName, backupName, latencySeconds) r.metrics.RegisterPodVolumeOpLatencyGauge(r.nodeName, pvb.Name, generateOpName, backupName, latencySeconds)
r.metrics.RegisterPodVolumeBackupDequeue(r.nodeName) r.metrics.RegisterPodVolumeBackupDequeue(r.nodeName)
log.Info("PodVolumeBackup completed") log.Info("PVB completed")
}
} }
func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) { func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) {
defer r.dataPathMgr.RemoveAsyncBR(pvbName) defer r.dataPathMgr.RemoveAsyncBR(pvbName)
log := r.logger.WithField("pvb", pvbName) log := r.logger.WithField("PVB", pvbName)
log.WithError(err).Error("Async fs backup data path failed") log.WithError(err).Error("Async fs backup data path failed")
var pvb velerov1api.PodVolumeBackup var pvb velerov1api.PodVolumeBackup
if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil { if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVB on failure") log.WithError(getErr).Warn("Failed to get PVB on failure")
} else { } else {
_, _ = r.errorOut(ctx, &pvb, err, "data path backup failed", log) _, _ = r.errorOut(ctx, &pvb, err, "data path backup failed", log)
@ -268,117 +542,186 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp
func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) { func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) {
defer r.dataPathMgr.RemoveAsyncBR(pvbName) defer r.dataPathMgr.RemoveAsyncBR(pvbName)
log := r.logger.WithField("pvb", pvbName) log := r.logger.WithField("PVB", pvbName)
log.Warn("Async fs backup data path canceled") log.Warn("Async fs backup data path canceled")
var pvb velerov1api.PodVolumeBackup var pvb velerov1api.PodVolumeBackup
if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil { if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVB on cancel") log.WithError(getErr).Warn("Failed to get PVB on cancel")
return
}
// cleans up any objects generated during the snapshot expose
r.exposer.CleanUp(ctx, getPVBOwnerObject(&pvb))
if err := UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if isPVBInFinalState(pvb) {
return false
}
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCanceled
if pvb.Status.StartTimestamp.IsZero() {
pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
}
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
return true
}); err != nil {
log.WithError(err).Error("error updating PVB status on cancel")
} else { } else {
_, _ = r.errorOut(ctx, &pvb, errors.New("PVB is canceled"), "data path backup canceled", log) delete(r.cancelledPVB, pvb.Name)
} }
} }
func (r *PodVolumeBackupReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvbName string, progress *uploader.Progress) { func (r *PodVolumeBackupReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvbName string, progress *uploader.Progress) {
log := r.logger.WithField("pvb", pvbName) log := r.logger.WithField("pvb", pvbName)
var pvb velerov1api.PodVolumeBackup if err := UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: pvbName}, log, func(pvb *velerov1api.PodVolumeBackup) bool {
if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil {
log.WithError(err).Warn("Failed to get PVB on progress")
return
}
original := pvb.DeepCopy()
pvb.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} pvb.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
return true
if err := r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil { }); err != nil {
log.WithError(err).Error("Failed to update progress") log.WithError(err).Error("Failed to update progress")
} }
} }
// SetupWithManager registers the PVB controller. // SetupWithManager registers the PVB controller.
func (r *PodVolumeBackupReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *PodVolumeBackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
pvb := object.(*velerov1api.PodVolumeBackup)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted {
return true
}
if pvb.Spec.Cancel && !isPVBInFinalState(pvb) {
return true
}
if isPVBInFinalState(pvb) && !pvb.DeletionTimestamp.IsZero() {
return true
}
return false
})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerPodVolumeBackup), r.client, &velerov1api.PodVolumeBackupList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{
Predicates: []predicate.Predicate{gp},
})
return ctrl.NewControllerManagedBy(mgr). return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.PodVolumeBackup{}). For(&velerov1api.PodVolumeBackup{}).
WatchesRawSource(s).
Watches(&corev1api.Pod{}, kube.EnqueueRequestsFromMapUpdateFunc(r.findPVBForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
newObj := ue.ObjectNew.(*corev1api.Pod)
if _, ok := newObj.Labels[velerov1api.PVBLabel]; !ok {
return false
}
if newObj.Spec.NodeName == "" {
return false
}
return true
},
CreateFunc: func(event.CreateEvent) bool {
return false
},
DeleteFunc: func(de event.DeleteEvent) bool {
return false
},
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
})).
Complete(r) Complete(r)
} }
// getParentSnapshot finds the most recent completed PodVolumeBackup for the func (r *PodVolumeBackupReconciler) findPVBForPod(ctx context.Context, podObj client.Object) []reconcile.Request {
// specified PVC and returns its snapshot ID. Any errors encountered are pod := podObj.(*corev1api.Pod)
// logged but not returned since they do not prevent a backup from proceeding. pvb, err := findPVBByPod(r.client, *pod)
func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log logrus.FieldLogger, pvcUID string, podVolumeBackup *velerov1api.PodVolumeBackup) string {
log = log.WithField("pvcUID", pvcUID)
log.Infof("Looking for most recent completed PodVolumeBackup for this PVC")
listOpts := &client.ListOptions{ log := r.logger.WithField("pod", pod.Name)
Namespace: podVolumeBackup.Namespace, if err != nil {
log.WithError(err).Error("unable to get PVB")
return []reconcile.Request{}
} else if pvb == nil {
log.Error("get empty PVB")
return []reconcile.Request{}
} }
matchingLabels := client.MatchingLabels(map[string]string{velerov1api.PVCUIDLabel: pvcUID}) log = log.WithFields(logrus.Fields{
matchingLabels.ApplyToList(listOpts) "PVB": pvb.Name,
})
var pvbList velerov1api.PodVolumeBackupList if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseAccepted {
if err := r.Client.List(ctx, &pvbList, listOpts); err != nil { return []reconcile.Request{}
log.WithError(errors.WithStack(err)).Error("getting list of podvolumebackups for this PVC")
} }
// Go through all the podvolumebackups for the PVC and look for the most if pod.Status.Phase == corev1api.PodRunning {
// recent completed one to use as the parent. log.Info("Preparing PVB")
var mostRecentPVB velerov1api.PodVolumeBackup
for _, pvb := range pvbList.Items { if err = UpdatePVBWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log,
if pvb.Spec.UploaderType != podVolumeBackup.Spec.UploaderType { func(pvb *velerov1api.PodVolumeBackup) bool {
continue if isPVBInFinalState(pvb) {
} log.Warnf("PVB %s is terminated, abort setting it to prepared", pvb.Name)
if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted { return false
continue
} }
if podVolumeBackup.Spec.BackupStorageLocation != pvb.Spec.BackupStorageLocation { pvb.Status.Phase = velerov1api.PodVolumeBackupPhasePrepared
// Check the backup storage location is the same as spec in order to return true
// support backup to multiple backup-locations. Otherwise, there exists }); err != nil {
// a case that backup volume snapshot to the second location would log.WithError(err).Warn("Failed to update PVB, prepare will halt for this PVB")
// failed, since the founded parent ID is only valid for the first return []reconcile.Request{}
// backup location, not the second backup location. Also, the second }
// backup should not use the first backup parent ID since its for the } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
// first backup location only. err := UpdatePVBWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log,
continue func(pvb *velerov1api.PodVolumeBackup) bool {
if pvb.Spec.Cancel {
return false
} }
if mostRecentPVB.Status == (velerov1api.PodVolumeBackupStatus{}) || pvb.Status.StartTimestamp.After(mostRecentPVB.Status.StartTimestamp.Time) { pvb.Spec.Cancel = true
mostRecentPVB = pvb pvb.Status.Message = fmt.Sprintf("Cancel PVB because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
return true
})
if err != nil {
log.WithError(err).Warn("failed to cancel PVB, and it will wait for prepare timeout")
return []reconcile.Request{}
} }
log.Infof("Exposed pod is in abnormal status(reason %s) and PVB is marked as cancel", reason)
} else {
return []reconcile.Request{}
} }
if mostRecentPVB.Status == (velerov1api.PodVolumeBackupStatus{}) { request := reconcile.Request{
log.Info("No completed PodVolumeBackup found for PVC") NamespacedName: types.NamespacedName{
return "" Namespace: pvb.Namespace,
Name: pvb.Name,
},
} }
return []reconcile.Request{request}
log.WithFields(map[string]any{
"parentPodVolumeBackup": mostRecentPVB.Name,
"parentSnapshotID": mostRecentPVB.Status.SnapshotID,
}).Info("Found most recent completed PodVolumeBackup for PVC")
return mostRecentPVB.Status.SnapshotID
}
func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(pvbName)
if fsBackup != nil {
fsBackup.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(pvbName)
} }
func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, err, msg, r.clock.Now(), log) r.exposer.CleanUp(ctx, getPVBOwnerObject(pvb))
_ = UpdatePVBStatusToFailed(ctx, r.client, pvb, err, msg, r.clock.Now(), log)
return ctrl.Result{}, err return ctrl.Result{}, err
} }
func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errOut error, msg string, time time.Time, log logrus.FieldLogger) error { func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errOut error, msg string, time time.Time, log logrus.FieldLogger) error {
original := pvb.DeepCopy() log.Info("update PVB status to Failed")
if patchErr := UpdatePVBWithRetry(context.Background(), c, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log,
func(pvb *velerov1api.PodVolumeBackup) bool {
if isPVBInFinalState(pvb) {
return false
}
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.CompletionTimestamp = &metav1.Time{Time: time} pvb.Status.CompletionTimestamp = &metav1.Time{Time: time}
if dataPathError, ok := errOut.(datapath.DataPathError); ok { if dataPathError, ok := errOut.(datapath.DataPathError); ok {
@ -389,10 +732,115 @@ func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1
} else { } else {
pvb.Status.Message = errors.WithMessage(errOut, msg).Error() pvb.Status.Message = errors.WithMessage(errOut, msg).Error()
} }
err := c.Patch(ctx, pvb, client.MergeFrom(original)) if pvb.Status.StartTimestamp.IsZero() {
if err != nil { pvb.Status.StartTimestamp = &metav1.Time{Time: time}
log.WithError(err).Error("error updating PodVolumeBackup status")
} }
return err return true
}); patchErr != nil {
log.WithError(patchErr).Warn("error updating PVB status")
}
return errOut
}
func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName string) {
asyncBR := r.dataPathMgr.GetAsyncBR(pvbName)
if asyncBR != nil {
asyncBR.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(pvbName)
}
func (r *PodVolumeBackupReconciler) setupExposeParam(pvb *velerov1api.PodVolumeBackup) exposer.PodVolumeExposeParam {
log := r.logger.WithField("PVB", pvb.Name)
hostingPodLabels := map[string]string{velerov1api.PVBLabel: pvb.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, pvb.Namespace, k, ""); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
}
}
hostingPodAnnotation := map[string]string{}
for _, k := range util.ThirdPartyAnnotations {
if v, err := nodeagent.GetAnnotationValue(context.Background(), r.kubeClient, pvb.Namespace, k, ""); err != nil {
if err != nodeagent.ErrNodeAgentAnnotationNotFound {
log.WithError(err).Warnf("Failed to check node-agent annotation, skip adding host pod annotation %s", k)
}
} else {
hostingPodAnnotation[k] = v
}
}
return exposer.PodVolumeExposeParam{
Type: exposer.PodVolumeExposeTypeBackup,
ClientNamespace: pvb.Spec.Pod.Namespace,
ClientPodName: pvb.Spec.Pod.Name,
ClientPodVolume: pvb.Spec.Volume,
HostingPodLabels: hostingPodLabels,
HostingPodAnnotations: hostingPodAnnotation,
OperationTimeout: r.resourceTimeout,
Resources: r.podResources,
}
}
func getPVBOwnerObject(pvb *velerov1api.PodVolumeBackup) corev1api.ObjectReference {
return corev1api.ObjectReference{
Kind: pvb.Kind,
Namespace: pvb.Namespace,
Name: pvb.Name,
UID: pvb.UID,
APIVersion: pvb.APIVersion,
}
}
func findPVBByPod(client client.Client, pod corev1api.Pod) (*velerov1api.PodVolumeBackup, error) {
if label, exist := pod.Labels[velerov1api.PVBLabel]; exist {
pvb := &velerov1api.PodVolumeBackup{}
err := client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: label,
}, pvb)
if err != nil {
return nil, errors.Wrapf(err, "error to find PVB by pod %s/%s", pod.Namespace, pod.Name)
}
return pvb, nil
}
return nil, nil
}
func isPVBInFinalState(pvb *velerov1api.PodVolumeBackup) bool {
return pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCanceled ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted
}
func UpdatePVBWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov1api.PodVolumeBackup) bool) error {
return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) {
pvb := &velerov1api.PodVolumeBackup{}
if err := client.Get(ctx, namespacedName, pvb); err != nil {
return false, errors.Wrap(err, "getting PVB")
}
if updateFunc(pvb) {
err := client.Update(ctx, pvb)
if err != nil {
if apierrors.IsConflict(err) {
log.Warnf("failed to update PVB for %s/%s and will retry it", pvb.Namespace, pvb.Name)
return false, nil
} else {
return false, errors.Wrapf(err, "error updating PVB with error %s/%s", pvb.Namespace, pvb.Name)
}
}
}
return true, nil
})
} }

File diff suppressed because it is too large Load Diff

View File

@ -169,7 +169,8 @@ func newBackupper(
} }
if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted && if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted &&
pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed { pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed &&
pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCanceled {
return return
} }
@ -179,7 +180,8 @@ func newBackupper(
existPVB, ok := existObj.(*velerov1api.PodVolumeBackup) existPVB, ok := existObj.(*velerov1api.PodVolumeBackup)
// the PVB in the indexer is already in final status, no need to call WaitGroup.Done() // the PVB in the indexer is already in final status, no need to call WaitGroup.Done()
if ok && (existPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || if ok && (existPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
existPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed) { existPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCanceled) {
statusChangedToFinal = false statusChangedToFinal = false
} }
} }
@ -428,7 +430,7 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
continue continue
} }
podVolumeBackups = append(podVolumeBackups, pvb) podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCanceled {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message) log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
} }
} }