vgdp ms pvr controller

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/9014/head
Lyndon-Li 2025-06-06 17:22:08 +08:00
parent b58dbcb0b8
commit 1f5436fe91
16 changed files with 1901 additions and 243 deletions

View File

@ -15,39 +15,40 @@ spec:
scope: Namespaced scope: Namespaced
versions: versions:
- additionalPrinterColumns: - additionalPrinterColumns:
- description: Namespace of the pod containing the volume to be restored - description: PodVolumeRestore status such as New/InProgress
jsonPath: .spec.pod.namespace jsonPath: .status.phase
name: Namespace name: Status
type: string type: string
- description: Name of the pod containing the volume to be restored - description: Time duration since this PodVolumeRestore was started
jsonPath: .spec.pod.name jsonPath: .status.startTimestamp
name: Pod name: Started
type: date
- description: Completed bytes
format: int64
jsonPath: .status.progress.bytesDone
name: Bytes Done
type: integer
- description: Total bytes
format: int64
jsonPath: .status.progress.totalBytes
name: Total Bytes
type: integer
- description: Name of the Backup Storage Location where the backup data is stored
jsonPath: .spec.backupStorageLocation
name: Storage Location
type: string
- description: Time duration since this PodVolumeRestore was created
jsonPath: .metadata.creationTimestamp
name: Age
type: date
- description: Name of the node where the PodVolumeRestore is processed
jsonPath: .status.node
name: Node
type: string type: string
- description: The type of the uploader to handle data transfer - description: The type of the uploader to handle data transfer
jsonPath: .spec.uploaderType jsonPath: .spec.uploaderType
name: Uploader Type name: Uploader Type
type: string type: string
- description: Name of the volume to be restored
jsonPath: .spec.volume
name: Volume
type: string
- description: Pod Volume Restore status such as New/InProgress
jsonPath: .status.phase
name: Status
type: string
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.totalBytes
name: TotalBytes
type: integer
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.bytesDone
name: BytesDone
type: integer
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1 name: v1
schema: schema:
openAPIV3Schema: openAPIV3Schema:
@ -77,6 +78,11 @@ spec:
BackupStorageLocation is the name of the backup storage location BackupStorageLocation is the name of the backup storage location
where the backup repository is stored. where the backup repository is stored.
type: string type: string
cancel:
description: |-
Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
when the PodVolumeRestore is in InProgress phase
type: boolean
pod: pod:
description: Pod is a reference to the pod containing the volume to description: Pod is a reference to the pod containing the volume to
be restored. be restored.
@ -162,6 +168,13 @@ spec:
status: status:
description: PodVolumeRestoreStatus is the current status of a PodVolumeRestore. description: PodVolumeRestoreStatus is the current status of a PodVolumeRestore.
properties: properties:
acceptedTimestamp:
description: |-
AcceptedTimestamp records the time the pod volume restore is to be prepared.
The server's time is used for AcceptedTimestamp
format: date-time
nullable: true
type: string
completionTimestamp: completionTimestamp:
description: |- description: |-
CompletionTimestamp records the time a restore was completed. CompletionTimestamp records the time a restore was completed.
@ -173,11 +186,19 @@ spec:
message: message:
description: Message is a message about the pod volume restore's status. description: Message is a message about the pod volume restore's status.
type: string type: string
node:
description: Node is name of the node where the pod volume restore
is processed.
type: string
phase: phase:
description: Phase is the current state of the PodVolumeRestore. description: Phase is the current state of the PodVolumeRestore.
enum: enum:
- New - New
- Accepted
- Prepared
- InProgress - InProgress
- Canceling
- Canceled
- Completed - Completed
- Failed - Failed
type: string type: string

File diff suppressed because one or more lines are too long

View File

@ -101,6 +101,9 @@ const (
// ExcludeFromBackupLabel is the label to exclude k8s resource from backup, // ExcludeFromBackupLabel is the label to exclude k8s resource from backup,
// even if the resource contains a matching selector label. // even if the resource contains a matching selector label.
ExcludeFromBackupLabel = "velero.io/exclude-from-backup" ExcludeFromBackupLabel = "velero.io/exclude-from-backup"
// PVRLabel is the label key used to identify the pvb for pvr pod
PVRLabel = "velero.io/pod-volume-restore"
) )
type AsyncOperationIDPrefix string type AsyncOperationIDPrefix string

View File

@ -54,15 +54,23 @@ type PodVolumeRestoreSpec struct {
// +optional // +optional
// +nullable // +nullable
UploaderSettings map[string]string `json:"uploaderSettings,omitempty"` UploaderSettings map[string]string `json:"uploaderSettings,omitempty"`
// Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
// when the PodVolumeRestore is in InProgress phase
Cancel bool `json:"cancel,omitempty"`
} }
// PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore. // PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore.
// +kubebuilder:validation:Enum=New;InProgress;Completed;Failed // +kubebuilder:validation:Enum=New;Accepted;Prepared;InProgress;Canceling;Canceled;Completed;Failed
type PodVolumeRestorePhase string type PodVolumeRestorePhase string
const ( const (
PodVolumeRestorePhaseNew PodVolumeRestorePhase = "New" PodVolumeRestorePhaseNew PodVolumeRestorePhase = "New"
PodVolumeRestorePhaseAccepted PodVolumeRestorePhase = "Accepted"
PodVolumeRestorePhasePrepared PodVolumeRestorePhase = "Prepared"
PodVolumeRestorePhaseInProgress PodVolumeRestorePhase = "InProgress" PodVolumeRestorePhaseInProgress PodVolumeRestorePhase = "InProgress"
PodVolumeRestorePhaseCanceling PodVolumeRestorePhase = "Canceling"
PodVolumeRestorePhaseCanceled PodVolumeRestorePhase = "Canceled"
PodVolumeRestorePhaseCompleted PodVolumeRestorePhase = "Completed" PodVolumeRestorePhaseCompleted PodVolumeRestorePhase = "Completed"
PodVolumeRestorePhaseFailed PodVolumeRestorePhase = "Failed" PodVolumeRestorePhaseFailed PodVolumeRestorePhase = "Failed"
) )
@ -95,6 +103,16 @@ type PodVolumeRestoreStatus struct {
// about the restore operation. // about the restore operation.
// +optional // +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"` Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// AcceptedTimestamp records the time the pod volume restore is to be prepared.
// The server's time is used for AcceptedTimestamp
// +optional
// +nullable
AcceptedTimestamp *metav1.Time `json:"acceptedTimestamp,omitempty"`
// Node is name of the node where the pod volume restore is processed.
// +optional
Node string `json:"node,omitempty"`
} }
// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed. // TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
@ -103,14 +121,14 @@ type PodVolumeRestoreStatus struct {
// +kubebuilder:object:generate=true // +kubebuilder:object:generate=true
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
// +kubebuilder:storageversion // +kubebuilder:storageversion
// +kubebuilder:printcolumn:name="Namespace",type="string",JSONPath=".spec.pod.namespace",description="Namespace of the pod containing the volume to be restored" // +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="PodVolumeRestore status such as New/InProgress"
// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".spec.pod.name",description="Name of the pod containing the volume to be restored" // +kubebuilder:printcolumn:name="Started",type="date",JSONPath=".status.startTimestamp",description="Time duration since this PodVolumeRestore was started"
// +kubebuilder:printcolumn:name="Bytes Done",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Completed bytes"
// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where the backup data is stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this PodVolumeRestore was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the PodVolumeRestore is processed"
// +kubebuilder:printcolumn:name="Uploader Type",type="string",JSONPath=".spec.uploaderType",description="The type of the uploader to handle data transfer" // +kubebuilder:printcolumn:name="Uploader Type",type="string",JSONPath=".spec.uploaderType",description="The type of the uploader to handle data transfer"
// +kubebuilder:printcolumn:name="Volume",type="string",JSONPath=".spec.volume",description="Name of the volume to be restored"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="TotalBytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="BytesDone",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type PodVolumeRestore struct { type PodVolumeRestore struct {
metav1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`

View File

@ -1149,6 +1149,10 @@ func (in *PodVolumeRestoreStatus) DeepCopyInto(out *PodVolumeRestoreStatus) {
*out = (*in).DeepCopy() *out = (*in).DeepCopy()
} }
out.Progress = in.Progress out.Progress = in.Progress
if in.AcceptedTimestamp != nil {
in, out := &in.AcceptedTimestamp, &out.AcceptedTimestamp
*out = (*in).DeepCopy()
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeRestoreStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeRestoreStatus.

View File

@ -88,6 +88,11 @@ func (b *PodBuilder) InitContainers(containers ...*corev1api.Container) *PodBuil
return b return b
} }
func (b *PodBuilder) InitContainerState(state corev1api.ContainerState) *PodBuilder {
b.object.Status.InitContainerStatuses = append(b.object.Status.InitContainerStatuses, corev1api.ContainerStatus{State: state})
return b
}
func (b *PodBuilder) Containers(containers ...*corev1api.Container) *PodBuilder { func (b *PodBuilder) Containers(containers ...*corev1api.Container) *PodBuilder {
for _, c := range containers { for _, c := range containers {
b.object.Spec.Containers = append(b.object.Spec.Containers, *c) b.object.Spec.Containers = append(b.object.Spec.Containers, *c)

View File

@ -97,3 +97,33 @@ func (b *PodVolumeRestoreBuilder) UploaderType(uploaderType string) *PodVolumeRe
b.object.Spec.UploaderType = uploaderType b.object.Spec.UploaderType = uploaderType
return b return b
} }
// Cancel sets the DataDownload's Cancel.
func (d *PodVolumeRestoreBuilder) Cancel(cancel bool) *PodVolumeRestoreBuilder {
d.object.Spec.Cancel = cancel
return d
}
// AcceptedTimestamp sets the PodVolumeRestore's AcceptedTimestamp.
func (d *PodVolumeRestoreBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) *PodVolumeRestoreBuilder {
d.object.Status.AcceptedTimestamp = acceptedTimestamp
return d
}
// Finalizers sets the PodVolumeRestore's Finalizers.
func (d *PodVolumeRestoreBuilder) Finalizers(finalizers []string) *PodVolumeRestoreBuilder {
d.object.Finalizers = finalizers
return d
}
// Message sets the PodVolumeRestore's Message.
func (d *PodVolumeRestoreBuilder) Message(msg string) *PodVolumeRestoreBuilder {
d.object.Status.Message = msg
return d
}
// Message sets the PodVolumeRestore's Node.
func (d *PodVolumeRestoreBuilder) Node(node string) *PodVolumeRestoreBuilder {
d.object.Status.Node = node
return d
}

View File

@ -306,10 +306,6 @@ func (s *nodeAgentServer) run() {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup) s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
} }
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
var loadAffinity *kube.LoadAffinity var loadAffinity *kube.LoadAffinity
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0] loadAffinity = s.dataPathConfigs.LoadAffinity[0]
@ -332,6 +328,10 @@ func (s *nodeAgentServer) run() {
} }
} }
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
dataUploadReconciler := controller.NewDataUploadReconciler( dataUploadReconciler := controller.NewDataUploadReconciler(
s.mgr.GetClient(), s.mgr.GetClient(),
s.mgr, s.mgr,
@ -525,7 +525,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
continue continue
} }
if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], errors.New("cannot survive from node-agent restart"),
fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed), fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed),
time.Now(), s.logger); err != nil { time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName()) s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName())

View File

@ -198,9 +198,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
if time.Since(spotted) > delay { if time.Since(spotted) > delay {
log.Infof("Data download %s is canceled in Phase %s but not handled in rasonable time", dd.GetName(), dd.Status.Phase) log.Infof("Data download %s is canceled in Phase %s but not handled in rasonable time", dd.GetName(), dd.Status.Phase)
if r.tryCancelDataDownload(ctx, dd, "") { r.tryCancelDataDownload(ctx, dd, "")
delete(r.cancelledDataDownload, dd.Name)
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
@ -526,6 +524,7 @@ func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd *
// success update // success update
r.metrics.RegisterDataDownloadCancel(r.nodeName) r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
delete(r.cancelledDataDownload, dd.Name)
log.Warn("data download is canceled") log.Warn("data download is canceled")

View File

@ -226,9 +226,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if time.Since(spotted) > delay { if time.Since(spotted) > delay {
log.Infof("Data upload %s is canceled in Phase %s but not handled in reasonable time", du.GetName(), du.Status.Phase) log.Infof("Data upload %s is canceled in Phase %s but not handled in reasonable time", du.GetName(), du.Status.Phase)
if r.tryCancelDataUpload(ctx, du, "") { r.tryCancelDataUpload(ctx, du, "")
delete(r.cancelledDataUpload, du.Name)
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
@ -566,6 +564,7 @@ func (r *DataUploadReconciler) tryCancelDataUpload(ctx context.Context, du *vele
r.metrics.RegisterDataUploadCancel(r.nodeName) r.metrics.RegisterDataUploadCancel(r.nodeName)
// cleans up any objects generated during the snapshot expose // cleans up any objects generated during the snapshot expose
r.cleanUp(ctx, du, log) r.cleanUp(ctx, du, log)
delete(r.cancelledDataUpload, du.Name)
log.Warn("data upload is canceled") log.Warn("data upload is canceled")

View File

@ -46,7 +46,10 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/filesystem"
) )
const pVBRRequestor string = "pod-volume-backup-restore" const (
pVBRRequestor = "pod-volume-backup-restore"
PodVolumeFinalizer = "velero.io/pod-volume-finalizer"
)
// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance
func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,10 @@
package controller
import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
func isLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Spec.UploaderType == uploader.ResticType
}

View File

@ -18,21 +18,43 @@ package controller
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time" "time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
clientgofake "k8s.io/client-go/kubernetes/fake"
clocks "k8s.io/utils/clock" clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks"
"github.com/vmware-tanzu/velero/pkg/restorehelper" "github.com/vmware-tanzu/velero/pkg/restorehelper"
"github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/test"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
) )
func TestShouldProcess(t *testing.T) { func TestShouldProcess(t *testing.T) {
@ -195,11 +217,11 @@ func TestShouldProcess(t *testing.T) {
c := &PodVolumeRestoreReconciler{ c := &PodVolumeRestoreReconciler{
logger: logrus.New(), logger: logrus.New(),
Client: cli, client: cli,
clock: &clocks.RealClock{}, clock: &clocks.RealClock{},
} }
shouldProcess, _, _ := c.shouldProcess(ctx, c.logger, ts.obj) shouldProcess, _, _ := shouldProcess(ctx, c.client, c.logger, ts.obj)
require.Equal(t, ts.shouldProcessed, shouldProcess) require.Equal(t, ts.shouldProcessed, shouldProcess)
}) })
} }
@ -478,7 +500,7 @@ func TestGetInitContainerIndex(t *testing.T) {
} }
} }
func TestFindVolumeRestoresForPod(t *testing.T) { func TestFindPVRForTargetPod(t *testing.T) {
pod := &corev1api.Pod{} pod := &corev1api.Pod{}
pod.UID = "uid" pod.UID = "uid"
@ -488,14 +510,14 @@ func TestFindVolumeRestoresForPod(t *testing.T) {
// no matching PVR // no matching PVR
reconciler := &PodVolumeRestoreReconciler{ reconciler := &PodVolumeRestoreReconciler{
Client: clientBuilder.Build(), client: clientBuilder.Build(),
logger: logrus.New(), logger: logrus.New(),
} }
requests := reconciler.findVolumeRestoresForPod(context.Background(), pod) requests := reconciler.findPVRForTargetPod(context.Background(), pod)
assert.Empty(t, requests) assert.Empty(t, requests)
// contain one matching PVR // contain one matching PVR
reconciler.Client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{ reconciler.client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{
Items: []velerov1api.PodVolumeRestore{ Items: []velerov1api.PodVolumeRestore{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -515,6 +537,902 @@ func TestFindVolumeRestoresForPod(t *testing.T) {
}, },
}, },
}).Build() }).Build()
requests = reconciler.findVolumeRestoresForPod(context.Background(), pod) requests = reconciler.findPVRForTargetPod(context.Background(), pod)
assert.Len(t, requests, 1) assert.Len(t, requests, 1)
} }
const pvrName string = "pvr-1"
func pvrBuilder() *builder.PodVolumeRestoreBuilder {
return builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).
BackupStorageLocation("bsl-loc").
SnapshotID("test-snapshot-id")
}
func initPodVolumeRestoreReconciler(objects []runtime.Object, cliObj []client.Object, needError ...bool) (*PodVolumeRestoreReconciler, error) {
var errs = make([]error, 6)
for k, isError := range needError {
if k == 0 && isError {
errs[0] = fmt.Errorf("Get error")
} else if k == 1 && isError {
errs[1] = fmt.Errorf("Create error")
} else if k == 2 && isError {
errs[2] = fmt.Errorf("Update error")
} else if k == 3 && isError {
errs[3] = fmt.Errorf("Patch error")
} else if k == 4 && isError {
errs[4] = apierrors.NewConflict(velerov1api.Resource("podvolumerestore"), pvrName, errors.New("conflict"))
} else if k == 5 && isError {
errs[5] = fmt.Errorf("List error")
}
}
return initPodVolumeRestoreReconcilerWithError(objects, cliObj, errs...)
}
func initPodVolumeRestoreReconcilerWithError(objects []runtime.Object, cliObj []client.Object, needError ...error) (*PodVolumeRestoreReconciler, error) {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
if err != nil {
return nil, err
}
err = corev1api.AddToScheme(scheme)
if err != nil {
return nil, err
}
fakeClient := &FakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(cliObj...).Build(),
}
for k := range needError {
if k == 0 {
fakeClient.getError = needError[0]
} else if k == 1 {
fakeClient.createError = needError[1]
} else if k == 2 {
fakeClient.updateError = needError[2]
} else if k == 3 {
fakeClient.patchError = needError[3]
} else if k == 4 {
fakeClient.updateConflict = needError[4]
} else if k == 5 {
fakeClient.listError = needError[5]
}
}
var fakeKubeClient *clientgofake.Clientset
if len(objects) != 0 {
fakeKubeClient = clientgofake.NewSimpleClientset(objects...)
} else {
fakeKubeClient = clientgofake.NewSimpleClientset()
}
fakeFS := velerotest.NewFakeFileSystem()
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "test-uid", "test-pvc")
_, err = fakeFS.Create(pathGlob)
if err != nil {
return nil, err
}
dataPathMgr := datapath.NewManager(1)
return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil
}
func TestPodVolumeRestoreReconcile(t *testing.T) {
daemonSet := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
APIVersion: appsv1api.SchemeGroupVersion.String(),
},
Spec: appsv1api.DaemonSetSpec{
Template: corev1api.PodTemplateSpec{
Spec: corev1api.PodSpec{
Containers: []corev1api.Container{
{
Image: "fake-image",
},
},
},
},
},
}
node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result()
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
notCreatePVR bool
targetPod *corev1api.Pod
dataMgr *datapath.Manager
needErrs []bool
needCreateFSBR bool
needDelete bool
sportTime *metav1.Time
mockExposeErr *bool
isGetExposeErr bool
isGetExposeNil bool
isPeekExposeErr bool
isNilExposer bool
notNilExpose bool
notMockCleanUp bool
mockInit bool
mockInitErr error
mockStart bool
mockStartErr error
mockCancel bool
mockClose bool
needExclusiveUpdateError error
expected *velerov1api.PodVolumeRestore
expectDeleted bool
expectCancelRecord bool
expectedResult *ctrl.Result
expectedErr string
expectDataPath bool
}{
{
name: "pvr not found",
pvr: pvrBuilder().Result(),
notCreatePVR: true,
},
{
name: "pvr not created in velero default namespace",
pvr: builder.ForPodVolumeRestore("test-ns", pvrName).Result(),
},
{
name: "get dd fail",
pvr: builder.ForPodVolumeRestore("test-ns", pvrName).Result(),
needErrs: []bool{true, false, false, false},
expectedErr: "Get error",
},
{
name: "add finalizer to pvr",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "add finalizer to pvr failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result(),
needErrs: []bool{false, false, true, false},
expectedErr: "error updating PVR velero/pvr-1: Update error",
},
{
name: "pvr is under deletion",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
needDelete: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Result(),
},
{
name: "pvr is under deletion but cancel failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
needErrs: []bool{false, false, true, false},
needDelete: true,
expectedErr: "error updating PVR velero/pvr-1: Update error",
},
{
name: "pvr is under deletion and in terminal state",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(),
sportTime: &metav1.Time{Time: time.Now()},
needDelete: true,
expectDeleted: true,
},
{
name: "pvr is under deletion and in terminal state, but remove finalizer failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(),
needErrs: []bool{false, false, true, false},
needDelete: true,
expectedErr: "error updating PVR velero/pvr-1: Update error",
},
{
name: "delay cancel negative for others",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
sportTime: &metav1.Time{Time: time.Now()},
expectCancelRecord: true,
},
{
name: "delay cancel negative for inProgress",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)},
expectCancelRecord: true,
},
{
name: "delay cancel affirmative for others",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "delay cancel affirmative for inProgress",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "delay cancel failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
needErrs: []bool{false, false, true, false},
sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
expectCancelRecord: true,
},
{
name: "Unknown pvr status",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "new pvr but accept failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(),
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
needErrs: []bool{false, false, true, false},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectedErr: "error accepting PVR pvr-1: error updating PVR velero/pvr-1: Update error",
},
{
name: "pvr is cancel on accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "pvr expose failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).PodNamespace("test-ns").PodName("test-pod").Finalizers([]string{PodVolumeFinalizer}).Result(),
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
mockExposeErr: boolptr.True(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Message("error to expose PVR").Result(),
expectedErr: "Error to expose restore exposer",
},
{
name: "pvr succeeds for accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).PodNamespace("test-ns").PodName("test-pod").Finalizers([]string{PodVolumeFinalizer}).Result(),
mockExposeErr: boolptr.False(),
notMockCleanUp: true,
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
},
{
name: "prepare timeout on accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Finalizers([]string{PodVolumeFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Message("timeout on preparing PVR").Result(),
},
{
name: "peek error on accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Finalizers([]string{PodVolumeFinalizer}).Result(),
isPeekExposeErr: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Message("found a PVR velero/pvr-1 with expose error: fake-peek-error. mark it as cancel").Result(),
},
{
name: "cancel on pvr",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Node("test-node").Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "Failed to get restore expose on prepared",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
isGetExposeErr: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("exposed PVR is not ready").Result(),
expectedErr: "Error to get PVR exposer",
},
{
name: "Get nil restore expose on prepared",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
isGetExposeNil: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("exposed PVR is not ready").Result(),
expectedErr: "no expose result is available for the current node",
},
{
name: "Error in data path is concurrent limited",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
dataMgr: datapath.NewManager(0),
notNilExpose: true,
notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path init error",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
mockInit: true,
mockInitErr: errors.New("fake-data-path-init-error"),
mockClose: true,
notNilExpose: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("error initializing data path").Result(),
expectedErr: "error initializing asyncBR: fake-data-path-init-error",
},
{
name: "Unable to update status to in progress for pvr",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
needErrs: []bool{false, false, true, false},
mockInit: true,
mockClose: true,
notNilExpose: true,
notMockCleanUp: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "data path start error",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
mockInit: true,
mockStart: true,
mockStartErr: errors.New("fake-data-path-start-error"),
mockClose: true,
notNilExpose: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("error starting data path").Result(),
expectedErr: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error",
},
{
name: "Prepare succeeds",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
mockInit: true,
mockStart: true,
notNilExpose: true,
notMockCleanUp: true,
expectDataPath: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "In progress pvr is not handled by the current node",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "In progress pvr is not set as cancel",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "Cancel pvr in progress with empty FSBR",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "Cancel pvr in progress and patch pvr error",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
needErrs: []bool{false, false, true, false},
needCreateFSBR: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectedErr: "error updating PVR velero/pvr-1: Update error",
expectCancelRecord: true,
expectDataPath: true,
},
{
name: "Cancel pvr in progress succeeds",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
needCreateFSBR: true,
mockCancel: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceling).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectDataPath: true,
expectCancelRecord: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
objs := []runtime.Object{daemonSet, node}
ctlObj := []client.Object{}
if test.targetPod != nil {
ctlObj = append(ctlObj, test.targetPod)
}
r, err := initPodVolumeRestoreReconciler(objs, ctlObj, test.needErrs...)
require.NoError(t, err)
if !test.notCreatePVR {
err = r.client.Create(context.Background(), test.pvr)
require.NoError(t, err)
}
if test.needDelete {
err = r.client.Delete(context.Background(), test.pvr)
require.NoError(t, err)
}
if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
r.dataPathMgr = datapath.NewManager(1)
}
if test.sportTime != nil {
r.cancelledPVR[test.pvr.Name] = test.sportTime.Time
}
funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore
if test.needExclusiveUpdateError != nil {
funcExclusiveUpdatePodVolumeRestore = func(context.Context, kbclient.Client, *velerov1api.PodVolumeRestore, func(*velerov1api.PodVolumeRestore)) (bool, error) {
return false, test.needExclusiveUpdateError
}
}
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t)
if test.mockInit {
asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr)
}
if test.mockStart {
asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr)
}
if test.mockCancel {
asyncBR.On("Cancel").Return()
}
if test.mockClose {
asyncBR.On("Close", mock.Anything).Return()
}
return asyncBR
}
if test.mockExposeErr != nil || test.isGetExposeErr || test.isGetExposeNil || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose {
if test.isNilExposer {
r.exposer = nil
} else {
r.exposer = func() exposer.PodVolumeExposer {
ep := exposermockes.NewPodVolumeExposer(t)
if test.mockExposeErr != nil {
if boolptr.IsSetToTrue(test.mockExposeErr) {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
} else {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
}
} else if test.notNilExpose {
hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1api.Volume{Name: "test-pvc"}).Result()
hostingPod.ObjectMeta.SetUID("test-uid")
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil)
} else if test.isGetExposeErr {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get PVR exposer"))
} else if test.isGetExposeNil {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
} else if test.isPeekExposeErr {
ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error"))
}
if !test.notMockCleanUp {
ep.On("CleanUp", mock.Anything, mock.Anything).Return()
}
return ep
}()
}
}
if test.needCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.pvr.Name); fsBR == nil {
_, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeRestore, test.pvr.Name, pVBRRequestor,
velerov1api.DefaultNamespace, "", "", datapath.Callbacks{OnCancelled: r.OnDataPathCancelled}, false, velerotest.NewLogger())
require.NoError(t, err)
}
}
actualResult, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: velerov1api.DefaultNamespace,
Name: test.pvr.Name,
},
})
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
}
if test.expectedResult != nil {
assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue)
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}
if test.expected != nil || test.expectDeleted {
pvr := velerov1api.PodVolumeRestore{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.pvr.Name,
Namespace: test.pvr.Namespace,
}, &pvr)
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
require.NoError(t, err)
assert.Equal(t, test.expected.Status.Phase, pvr.Status.Phase)
assert.Contains(t, pvr.Status.Message, test.expected.Status.Message)
assert.Equal(t, test.expected.Finalizers, pvr.Finalizers)
assert.Equal(t, test.expected.Spec.Cancel, pvr.Spec.Cancel)
}
}
if !test.expectDataPath {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.pvr.Name))
} else {
assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.pvr.Name))
}
if test.expectCancelRecord {
assert.Contains(t, r.cancelledPVR, test.pvr.Name)
} else {
assert.Empty(t, r.cancelledPVR)
}
})
}
}
func TestOnPodVolumeRestoreFailed(t *testing.T) {
for _, getErr := range []bool{true, false} {
ctx := context.TODO()
needErrs := []bool{getErr, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...)
require.NoError(t, err)
pvr := pvrBuilder().Result()
namespace := pvr.Namespace
pvrName := pvr.Name
assert.NoError(t, r.client.Create(ctx, pvr))
r.OnDataPathFailed(ctx, namespace, pvrName, fmt.Errorf("Failed to handle %v", pvrName))
updatedPVR := &velerov1api.PodVolumeRestore{}
if getErr {
assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.NotEqual(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase)
assert.True(t, updatedPVR.Status.StartTimestamp.IsZero())
} else {
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.Equal(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase)
assert.True(t, updatedPVR.Status.StartTimestamp.IsZero())
}
}
}
func TestOnPodVolumeRestoreCancelled(t *testing.T) {
for _, getErr := range []bool{true, false} {
ctx := context.TODO()
needErrs := []bool{getErr, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, nil, needErrs...)
require.NoError(t, err)
pvr := pvrBuilder().Result()
namespace := pvr.Namespace
pvrName := pvr.Name
assert.NoError(t, r.client.Create(ctx, pvr))
r.OnDataPathCancelled(ctx, namespace, pvrName)
updatedPVR := &velerov1api.PodVolumeRestore{}
if getErr {
assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.NotEqual(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase)
assert.True(t, updatedPVR.Status.StartTimestamp.IsZero())
} else {
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.Equal(t, velerov1api.PodVolumeRestorePhaseCanceled, updatedPVR.Status.Phase)
assert.False(t, updatedPVR.Status.StartTimestamp.IsZero())
assert.False(t, updatedPVR.Status.CompletionTimestamp.IsZero())
}
}
}
func TestOnPodVolumeRestoreCompleted(t *testing.T) {
tests := []struct {
name string
emptyFSBR bool
isGetErr bool
rebindVolumeErr bool
}{
{
name: "PVR complete",
emptyFSBR: false,
isGetErr: false,
rebindVolumeErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
needErrs := []bool{test.isGetErr, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...)
r.exposer = func() exposer.PodVolumeExposer {
ep := exposermockes.NewPodVolumeExposer(t)
ep.On("CleanUp", mock.Anything, mock.Anything).Return()
return ep
}()
require.NoError(t, err)
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result()
namespace := pvr.Namespace
ddName := pvr.Name
assert.NoError(t, r.client.Create(ctx, pvr))
r.OnDataPathCompleted(ctx, namespace, ddName, datapath.Result{})
updatedDD := &velerov1api.PodVolumeRestore{}
if test.isGetErr {
assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, updatedDD))
assert.Equal(t, velerov1api.PodVolumeRestorePhase(""), updatedDD.Status.Phase)
assert.True(t, updatedDD.Status.CompletionTimestamp.IsZero())
} else {
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, updatedDD))
assert.Equal(t, velerov1api.PodVolumeRestorePhaseCompleted, updatedDD.Status.Phase)
assert.False(t, updatedDD.Status.CompletionTimestamp.IsZero())
}
})
}
}
func TestOnPodVolumeRestoreProgress(t *testing.T) {
totalBytes := int64(1024)
bytesDone := int64(512)
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
progress uploader.Progress
needErrs []bool
}{
{
name: "patch in progress phase success",
pvr: pvrBuilder().Result(),
progress: uploader.Progress{
TotalBytes: totalBytes,
BytesDone: bytesDone,
},
},
{
name: "failed to get pvr",
pvr: pvrBuilder().Result(),
needErrs: []bool{true, false, false, false},
},
{
name: "failed to patch pvr",
pvr: pvrBuilder().Result(),
needErrs: []bool{false, false, true, false},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, test.needErrs...)
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{})
}()
pvr := pvrBuilder().Result()
namespace := pvr.Namespace
pvrName := pvr.Name
assert.NoError(t, r.client.Create(context.Background(), pvr))
// Create a Progress object
progress := &uploader.Progress{
TotalBytes: totalBytes,
BytesDone: bytesDone,
}
r.OnDataPathProgress(ctx, namespace, pvrName, progress)
if len(test.needErrs) != 0 && !test.needErrs[0] {
updatedPVR := &velerov1api.PodVolumeRestore{}
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.Equal(t, test.progress.TotalBytes, updatedPVR.Status.Progress.TotalBytes)
assert.Equal(t, test.progress.BytesDone, updatedPVR.Status.Progress.BytesDone)
}
})
}
}
func TestFindPVBForRestorePod(t *testing.T) {
needErrs := []bool{false, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...)
require.NoError(t, err)
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
pod *corev1api.Pod
checkFunc func(*velerov1api.PodVolumeRestore, []reconcile.Request)
}{
{
name: "find pvr for pod",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)
// Assert that the request contains the correct namespaced name
assert.Equal(t, pvr.Namespace, requests[0].Namespace)
assert.Equal(t, pvr.Name, requests[0].Name)
},
}, {
name: "no selected label found for pod",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Empty(t, requests)
},
}, {
name: "no matched pod",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: "non-existing-pvr"}).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
assert.Empty(t, requests)
},
},
{
name: "pvr not accept",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
assert.Empty(t, requests)
},
},
}
for _, test := range tests {
ctx := context.Background()
assert.NoError(t, r.client.Create(ctx, test.pod))
assert.NoError(t, r.client.Create(ctx, test.pvr))
// Call the findSnapshotRestoreForPod function
requests := r.findPVRForRestorePod(context.Background(), test.pod)
test.checkFunc(test.pvr, requests)
r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{})
if test.pod != nil {
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
}
}
}
func TestOnPVRPrepareTimeout(t *testing.T) {
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
needErrs []error
expected *velerov1api.PodVolumeRestore
}{
{
name: "update fail",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
expected: pvrBuilder().Result(),
},
{
name: "update interrupted",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
expected: pvrBuilder().Result(),
},
{
name: "succeed",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
expected: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(),
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initPodVolumeRestoreReconcilerWithError(nil, []client.Object{}, test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.pvr)
require.NoError(t, err)
r.onPrepareTimeout(ctx, test.pvr)
pvr := velerov1api.PodVolumeRestore{}
_ = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.pvr.Name,
Namespace: test.pvr.Namespace,
}, &pvr)
assert.Equal(t, test.expected.Status.Phase, pvr.Status.Phase)
}
}
func TestTryCancelPVR(t *testing.T) {
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
needErrs []error
succeeded bool
expectedErr string
}{
{
name: "update fail",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
},
{
name: "cancel by others",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
},
{
name: "succeed",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
succeeded: true,
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initPodVolumeRestoreReconcilerWithError(nil, []client.Object{}, test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.pvr)
require.NoError(t, err)
r.tryCancelPodVolumeRestore(ctx, test.pvr, "")
if test.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectedErr)
}
}
}
func TestUpdatePVRWithRetry(t *testing.T) {
namespacedName := types.NamespacedName{
Name: pvrName,
Namespace: "velero",
}
// Define test cases
testCases := []struct {
Name string
needErrs []bool
noChange bool
ExpectErr bool
}{
{
Name: "SuccessOnFirstAttempt",
},
{
Name: "Error get",
needErrs: []bool{true, false, false, false, false},
ExpectErr: true,
},
{
Name: "Error update",
needErrs: []bool{false, false, true, false, false},
ExpectErr: true,
},
{
Name: "no change",
noChange: true,
needErrs: []bool{false, false, true, false, false},
},
{
Name: "Conflict with error timeout",
needErrs: []bool{false, false, false, false, true},
ExpectErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
ctx, cancelFunc := context.WithTimeout(context.TODO(), time.Second*5)
defer cancelFunc()
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, tc.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, pvrBuilder().Result())
require.NoError(t, err)
updateFunc := func(pvr *velerov1api.PodVolumeRestore) bool {
if tc.noChange {
return false
}
pvr.Spec.Cancel = true
return true
}
err = UpdatePVRWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc)
if tc.ExpectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@ -0,0 +1,125 @@
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
import (
context "context"
client "sigs.k8s.io/controller-runtime/pkg/client"
exposer "github.com/vmware-tanzu/velero/pkg/exposer"
mock "github.com/stretchr/testify/mock"
time "time"
corev1api "k8s.io/api/core/v1"
)
// PodVolumeExposer is an autogenerated mock type for the PodVolumeExposer type
type PodVolumeExposer struct {
mock.Mock
}
// CleanUp provides a mock function with given fields: _a0, _a1
func (_m *PodVolumeExposer) CleanUp(_a0 context.Context, _a1 corev1api.ObjectReference) {
_m.Called(_a0, _a1)
}
// DiagnoseExpose provides a mock function with given fields: _a0, _a1
func (_m *PodVolumeExposer) DiagnoseExpose(_a0 context.Context, _a1 corev1api.ObjectReference) string {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for DiagnoseExpose")
}
var r0 string
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference) string); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Expose provides a mock function with given fields: _a0, _a1, _a2
func (_m *PodVolumeExposer) Expose(_a0 context.Context, _a1 corev1api.ObjectReference, _a2 exposer.PodVolumeExposeParam) error {
ret := _m.Called(_a0, _a1, _a2)
if len(ret) == 0 {
panic("no return value specified for Expose")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, exposer.PodVolumeExposeParam) error); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetExposed provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *PodVolumeExposer) GetExposed(_a0 context.Context, _a1 corev1api.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
if len(ret) == 0 {
panic("no return value specified for GetExposed")
}
var r0 *exposer.ExposeResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok {
return rf(_a0, _a1, _a2, _a3, _a4)
}
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) *exposer.ExposeResult); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*exposer.ExposeResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) error); ok {
r1 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PeekExposed provides a mock function with given fields: _a0, _a1
func (_m *PodVolumeExposer) PeekExposed(_a0 context.Context, _a1 corev1api.ObjectReference) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for PeekExposed")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewPodVolumeExposer creates a new instance of PodVolumeExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewPodVolumeExposer(t interface {
mock.TestingT
Cleanup(func())
}) *PodVolumeExposer {
mock := &PodVolumeExposer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -98,7 +98,7 @@ func newRestorer(
return return
} }
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed { if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled {
r.resultsLock.Lock() r.resultsLock.Lock()
defer r.resultsLock.Unlock() defer r.resultsLock.Unlock()
@ -234,7 +234,7 @@ ForEachVolume:
errs = append(errs, errors.New("timed out waiting for all PodVolumeRestores to complete")) errs = append(errs, errors.New("timed out waiting for all PodVolumeRestores to complete"))
break ForEachVolume break ForEachVolume
case res := <-resultsChan: case res := <-resultsChan:
if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed { if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || res.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled {
errs = append(errs, errors.Errorf("pod volume restore failed: %s", res.Status.Message)) errs = append(errs, errors.Errorf("pod volume restore failed: %s", res.Status.Message))
} }
tracker.TrackPodVolume(res) tracker.TrackPodVolume(res)