Merge branch 'main' into vgdp-ms-pvb-controller

pull/9015/head
Lyndon-Li 2025-06-12 15:52:22 +08:00
commit 33bb51b14d
13 changed files with 1885 additions and 255 deletions

View File

@ -0,0 +1 @@
Fix issue #8959, add VGDP MS PVR controller

View File

@ -15,39 +15,40 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Namespace of the pod containing the volume to be restored
jsonPath: .spec.pod.namespace
name: Namespace
- description: PodVolumeRestore status such as New/InProgress
jsonPath: .status.phase
name: Status
type: string
- description: Name of the pod containing the volume to be restored
jsonPath: .spec.pod.name
name: Pod
- description: Time duration since this PodVolumeRestore was started
jsonPath: .status.startTimestamp
name: Started
type: date
- description: Completed bytes
format: int64
jsonPath: .status.progress.bytesDone
name: Bytes Done
type: integer
- description: Total bytes
format: int64
jsonPath: .status.progress.totalBytes
name: Total Bytes
type: integer
- description: Name of the Backup Storage Location where the backup data is stored
jsonPath: .spec.backupStorageLocation
name: Storage Location
type: string
- description: Time duration since this PodVolumeRestore was created
jsonPath: .metadata.creationTimestamp
name: Age
type: date
- description: Name of the node where the PodVolumeRestore is processed
jsonPath: .status.node
name: Node
type: string
- description: The type of the uploader to handle data transfer
jsonPath: .spec.uploaderType
name: Uploader Type
type: string
- description: Name of the volume to be restored
jsonPath: .spec.volume
name: Volume
type: string
- description: Pod Volume Restore status such as New/InProgress
jsonPath: .status.phase
name: Status
type: string
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.totalBytes
name: TotalBytes
type: integer
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.bytesDone
name: BytesDone
type: integer
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1
schema:
openAPIV3Schema:
@ -167,6 +168,13 @@ spec:
status:
description: PodVolumeRestoreStatus is the current status of a PodVolumeRestore.
properties:
acceptedTimestamp:
description: |-
AcceptedTimestamp records the time the pod volume restore is to be prepared.
The server's time is used for AcceptedTimestamp
format: date-time
nullable: true
type: string
completionTimestamp:
description: |-
CompletionTimestamp records the time a restore was completed.
@ -178,11 +186,19 @@ spec:
message:
description: Message is a message about the pod volume restore's status.
type: string
node:
description: Node is name of the node where the pod volume restore
is processed.
type: string
phase:
description: Phase is the current state of the PodVolumeRestore.
enum:
- New
- Accepted
- Prepared
- InProgress
- Canceling
- Canceled
- Completed
- Failed
type: string

View File

@ -107,6 +107,9 @@ const (
// PVBLabel is the label key used to identify the pvb for pvb pod
PVBLabel = "velero.io/pod-volume-backup"
// PVRLabel is the label key used to identify the pvb for pvr pod
PVRLabel = "velero.io/pod-volume-restore"
)
type AsyncOperationIDPrefix string

View File

@ -61,12 +61,16 @@ type PodVolumeRestoreSpec struct {
}
// PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore.
// +kubebuilder:validation:Enum=New;InProgress;Completed;Failed
// +kubebuilder:validation:Enum=New;Accepted;Prepared;InProgress;Canceling;Canceled;Completed;Failed
type PodVolumeRestorePhase string
const (
PodVolumeRestorePhaseNew PodVolumeRestorePhase = "New"
PodVolumeRestorePhaseAccepted PodVolumeRestorePhase = "Accepted"
PodVolumeRestorePhasePrepared PodVolumeRestorePhase = "Prepared"
PodVolumeRestorePhaseInProgress PodVolumeRestorePhase = "InProgress"
PodVolumeRestorePhaseCanceling PodVolumeRestorePhase = "Canceling"
PodVolumeRestorePhaseCanceled PodVolumeRestorePhase = "Canceled"
PodVolumeRestorePhaseCompleted PodVolumeRestorePhase = "Completed"
PodVolumeRestorePhaseFailed PodVolumeRestorePhase = "Failed"
)
@ -99,6 +103,16 @@ type PodVolumeRestoreStatus struct {
// about the restore operation.
// +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// AcceptedTimestamp records the time the pod volume restore is to be prepared.
// The server's time is used for AcceptedTimestamp
// +optional
// +nullable
AcceptedTimestamp *metav1.Time `json:"acceptedTimestamp,omitempty"`
// Node is name of the node where the pod volume restore is processed.
// +optional
Node string `json:"node,omitempty"`
}
// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
@ -107,14 +121,14 @@ type PodVolumeRestoreStatus struct {
// +kubebuilder:object:generate=true
// +kubebuilder:object:root=true
// +kubebuilder:storageversion
// +kubebuilder:printcolumn:name="Namespace",type="string",JSONPath=".spec.pod.namespace",description="Namespace of the pod containing the volume to be restored"
// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".spec.pod.name",description="Name of the pod containing the volume to be restored"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="PodVolumeRestore status such as New/InProgress"
// +kubebuilder:printcolumn:name="Started",type="date",JSONPath=".status.startTimestamp",description="Time duration since this PodVolumeRestore was started"
// +kubebuilder:printcolumn:name="Bytes Done",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Completed bytes"
// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where the backup data is stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this PodVolumeRestore was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the PodVolumeRestore is processed"
// +kubebuilder:printcolumn:name="Uploader Type",type="string",JSONPath=".spec.uploaderType",description="The type of the uploader to handle data transfer"
// +kubebuilder:printcolumn:name="Volume",type="string",JSONPath=".spec.volume",description="Name of the volume to be restored"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="TotalBytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="BytesDone",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type PodVolumeRestore struct {
metav1.TypeMeta `json:",inline"`

View File

@ -1153,6 +1153,10 @@ func (in *PodVolumeRestoreStatus) DeepCopyInto(out *PodVolumeRestoreStatus) {
*out = (*in).DeepCopy()
}
out.Progress = in.Progress
if in.AcceptedTimestamp != nil {
in, out := &in.AcceptedTimestamp, &out.AcceptedTimestamp
*out = (*in).DeepCopy()
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeRestoreStatus.

View File

@ -88,6 +88,11 @@ func (b *PodBuilder) InitContainers(containers ...*corev1api.Container) *PodBuil
return b
}
func (b *PodBuilder) InitContainerState(state corev1api.ContainerState) *PodBuilder {
b.object.Status.InitContainerStatuses = append(b.object.Status.InitContainerStatuses, corev1api.ContainerStatus{State: state})
return b
}
func (b *PodBuilder) Containers(containers ...*corev1api.Container) *PodBuilder {
for _, c := range containers {
b.object.Spec.Containers = append(b.object.Spec.Containers, *c)

View File

@ -103,3 +103,33 @@ func (b *PodVolumeRestoreBuilder) OwnerReference(ownerRef []metav1.OwnerReferenc
b.object.OwnerReferences = ownerRef
return b
}
// Cancel sets the DataDownload's Cancel.
func (b *PodVolumeRestoreBuilder) Cancel(cancel bool) *PodVolumeRestoreBuilder {
b.object.Spec.Cancel = cancel
return b
}
// AcceptedTimestamp sets the PodVolumeRestore's AcceptedTimestamp.
func (b *PodVolumeRestoreBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) *PodVolumeRestoreBuilder {
b.object.Status.AcceptedTimestamp = acceptedTimestamp
return b
}
// Finalizers sets the PodVolumeRestore's Finalizers.
func (b *PodVolumeRestoreBuilder) Finalizers(finalizers []string) *PodVolumeRestoreBuilder {
b.object.Finalizers = finalizers
return b
}
// Message sets the PodVolumeRestore's Message.
func (b *PodVolumeRestoreBuilder) Message(msg string) *PodVolumeRestoreBuilder {
b.object.Status.Message = msg
return b
}
// Message sets the PodVolumeRestore's Node.
func (b *PodVolumeRestoreBuilder) Node(node string) *PodVolumeRestoreBuilder {
b.object.Status.Node = node
return b
}

View File

@ -48,7 +48,6 @@ import (
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
@ -60,7 +59,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
@ -282,28 +280,6 @@ func (s *nodeAgentServer) run() {
s.logger.Info("Starting controllers")
credentialFileStore, err := credentials.NewNamespacedFileStore(
s.mgr.GetClient(),
s.namespace,
credentials.DefaultStoreDirectory(),
filesystem.NewFileSystem(),
)
if err != nil {
s.logger.Fatalf("Failed to create credentials file store: %v", err)
}
credSecretStore, err := credentials.NewNamespacedSecretStore(s.mgr.GetClient(), s.namespace)
if err != nil {
s.logger.Fatalf("Failed to create secret file store: %v", err)
}
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
var loadAffinity *kube.LoadAffinity
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
@ -332,6 +308,10 @@ func (s *nodeAgentServer) run() {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
}
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
dataUploadReconciler := controller.NewDataUploadReconciler(
s.mgr.GetClient(),
s.mgr,
@ -525,7 +505,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
continue
}
if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i],
if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], errors.New("cannot survive from node-agent restart"),
fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed),
time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName())

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,10 @@
package controller
import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
func isLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Spec.UploaderType == uploader.ResticType
}

View File

@ -18,21 +18,43 @@ package controller
import (
"context"
"fmt"
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
clientgofake "k8s.io/client-go/kubernetes/fake"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks"
"github.com/vmware-tanzu/velero/pkg/restorehelper"
"github.com/vmware-tanzu/velero/pkg/test"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
func TestShouldProcess(t *testing.T) {
@ -195,11 +217,11 @@ func TestShouldProcess(t *testing.T) {
c := &PodVolumeRestoreReconciler{
logger: logrus.New(),
Client: cli,
client: cli,
clock: &clocks.RealClock{},
}
shouldProcess, _, _ := c.shouldProcess(ctx, c.logger, ts.obj)
shouldProcess, _, _ := shouldProcess(ctx, c.client, c.logger, ts.obj)
require.Equal(t, ts.shouldProcessed, shouldProcess)
})
}
@ -478,7 +500,7 @@ func TestGetInitContainerIndex(t *testing.T) {
}
}
func TestFindVolumeRestoresForPod(t *testing.T) {
func TestFindPVRForTargetPod(t *testing.T) {
pod := &corev1api.Pod{}
pod.UID = "uid"
@ -488,14 +510,14 @@ func TestFindVolumeRestoresForPod(t *testing.T) {
// no matching PVR
reconciler := &PodVolumeRestoreReconciler{
Client: clientBuilder.Build(),
client: clientBuilder.Build(),
logger: logrus.New(),
}
requests := reconciler.findVolumeRestoresForPod(context.Background(), pod)
requests := reconciler.findPVRForTargetPod(context.Background(), pod)
assert.Empty(t, requests)
// contain one matching PVR
reconciler.Client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{
reconciler.client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{
Items: []velerov1api.PodVolumeRestore{
{
ObjectMeta: metav1.ObjectMeta{
@ -515,6 +537,903 @@ func TestFindVolumeRestoresForPod(t *testing.T) {
},
},
}).Build()
requests = reconciler.findVolumeRestoresForPod(context.Background(), pod)
requests = reconciler.findPVRForTargetPod(context.Background(), pod)
assert.Len(t, requests, 1)
}
const pvrName string = "pvr-1"
func pvrBuilder() *builder.PodVolumeRestoreBuilder {
return builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).
BackupStorageLocation("bsl-loc").
SnapshotID("test-snapshot-id")
}
func initPodVolumeRestoreReconciler(objects []runtime.Object, cliObj []client.Object, needError ...bool) (*PodVolumeRestoreReconciler, error) {
var errs = make([]error, 6)
for k, isError := range needError {
if k == 0 && isError {
errs[0] = fmt.Errorf("Get error")
} else if k == 1 && isError {
errs[1] = fmt.Errorf("Create error")
} else if k == 2 && isError {
errs[2] = fmt.Errorf("Update error")
} else if k == 3 && isError {
errs[3] = fmt.Errorf("Patch error")
} else if k == 4 && isError {
errs[4] = apierrors.NewConflict(velerov1api.Resource("podvolumerestore"), pvrName, errors.New("conflict"))
} else if k == 5 && isError {
errs[5] = fmt.Errorf("List error")
}
}
return initPodVolumeRestoreReconcilerWithError(objects, cliObj, errs...)
}
func initPodVolumeRestoreReconcilerWithError(objects []runtime.Object, cliObj []client.Object, needError ...error) (*PodVolumeRestoreReconciler, error) {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
if err != nil {
return nil, err
}
err = corev1api.AddToScheme(scheme)
if err != nil {
return nil, err
}
fakeClient := &FakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(cliObj...).Build(),
}
for k := range needError {
if k == 0 {
fakeClient.getError = needError[0]
} else if k == 1 {
fakeClient.createError = needError[1]
} else if k == 2 {
fakeClient.updateError = needError[2]
} else if k == 3 {
fakeClient.patchError = needError[3]
} else if k == 4 {
fakeClient.updateConflict = needError[4]
} else if k == 5 {
fakeClient.listError = needError[5]
}
}
var fakeKubeClient *clientgofake.Clientset
if len(objects) != 0 {
fakeKubeClient = clientgofake.NewSimpleClientset(objects...)
} else {
fakeKubeClient = clientgofake.NewSimpleClientset()
}
fakeFS := velerotest.NewFakeFileSystem()
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "test-uid", "test-pvc")
_, err = fakeFS.Create(pathGlob)
if err != nil {
return nil, err
}
dataPathMgr := datapath.NewManager(1)
return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil
}
func TestPodVolumeRestoreReconcile(t *testing.T) {
daemonSet := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
APIVersion: appsv1api.SchemeGroupVersion.String(),
},
Spec: appsv1api.DaemonSetSpec{
Template: corev1api.PodTemplateSpec{
Spec: corev1api.PodSpec{
Containers: []corev1api.Container{
{
Image: "fake-image",
},
},
},
},
},
}
node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result()
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
notCreatePVR bool
targetPod *corev1api.Pod
dataMgr *datapath.Manager
needErrs []bool
needCreateFSBR bool
needDelete bool
sportTime *metav1.Time
mockExposeErr *bool
isGetExposeErr bool
isGetExposeNil bool
isPeekExposeErr bool
isNilExposer bool
notNilExpose bool
notMockCleanUp bool
mockInit bool
mockInitErr error
mockStart bool
mockStartErr error
mockCancel bool
mockClose bool
needExclusiveUpdateError error
expected *velerov1api.PodVolumeRestore
expectDeleted bool
expectCancelRecord bool
expectedResult *ctrl.Result
expectedErr string
expectDataPath bool
}{
{
name: "pvr not found",
pvr: pvrBuilder().Result(),
notCreatePVR: true,
},
{
name: "pvr not created in velero default namespace",
pvr: builder.ForPodVolumeRestore("test-ns", pvrName).Result(),
},
{
name: "get dd fail",
pvr: builder.ForPodVolumeRestore("test-ns", pvrName).Result(),
needErrs: []bool{true, false, false, false},
expectedErr: "Get error",
},
{
name: "add finalizer to pvr",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "add finalizer to pvr failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result(),
needErrs: []bool{false, false, true, false},
expectedErr: "error updating PVR velero/pvr-1: Update error",
},
{
name: "pvr is under deletion",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
needDelete: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Result(),
},
{
name: "pvr is under deletion but cancel failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
needErrs: []bool{false, false, true, false},
needDelete: true,
expectedErr: "error updating PVR velero/pvr-1: Update error",
},
{
name: "pvr is under deletion and in terminal state",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(),
sportTime: &metav1.Time{Time: time.Now()},
needDelete: true,
expectDeleted: true,
},
{
name: "pvr is under deletion and in terminal state, but remove finalizer failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(),
needErrs: []bool{false, false, true, false},
needDelete: true,
expectedErr: "error updating PVR velero/pvr-1: Update error",
},
{
name: "delay cancel negative for others",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
sportTime: &metav1.Time{Time: time.Now()},
expectCancelRecord: true,
},
{
name: "delay cancel negative for inProgress",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)},
expectCancelRecord: true,
},
{
name: "delay cancel affirmative for others",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "delay cancel affirmative for inProgress",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "delay cancel failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
needErrs: []bool{false, false, true, false},
sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
expectCancelRecord: true,
},
{
name: "Unknown pvr status",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "new pvr but accept failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(),
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
needErrs: []bool{false, false, true, false},
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectedErr: "error accepting PVR pvr-1: error updating PVR velero/pvr-1: Update error",
},
{
name: "pvr is cancel on accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Result(),
expectCancelRecord: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "pvr expose failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).PodNamespace("test-ns").PodName("test-pod").Finalizers([]string{PodVolumeFinalizer}).Result(),
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
mockExposeErr: boolptr.True(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Message("error to expose PVR").Result(),
expectedErr: "Error to expose restore exposer",
},
{
name: "pvr succeeds for accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).PodNamespace("test-ns").PodName("test-pod").Finalizers([]string{PodVolumeFinalizer}).Result(),
mockExposeErr: boolptr.False(),
notMockCleanUp: true,
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
},
{
name: "prepare timeout on accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Finalizers([]string{PodVolumeFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Message("timeout on preparing PVR").Result(),
},
{
name: "peek error on accepted",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Finalizers([]string{PodVolumeFinalizer}).Result(),
isPeekExposeErr: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Message("found a PVR velero/pvr-1 with expose error: fake-peek-error. mark it as cancel").Result(),
},
{
name: "cancel on pvr",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Node("test-node").Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(),
},
{
name: "Failed to get restore expose on prepared",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
isGetExposeErr: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("exposed PVR is not ready").Result(),
expectedErr: "Error to get PVR exposer",
},
{
name: "Get nil restore expose on prepared",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
isGetExposeNil: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("exposed PVR is not ready").Result(),
expectedErr: "no expose result is available for the current node",
},
{
name: "Error in data path is concurrent limited",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
dataMgr: datapath.NewManager(0),
notNilExpose: true,
notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path init error",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
mockInit: true,
mockInitErr: errors.New("fake-data-path-init-error"),
mockClose: true,
notNilExpose: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("error initializing data path").Result(),
expectedErr: "error initializing asyncBR: fake-data-path-init-error",
},
{
name: "Unable to update status to in progress for pvr",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
needErrs: []bool{false, false, true, false},
mockInit: true,
mockClose: true,
notNilExpose: true,
notMockCleanUp: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "data path start error",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
mockInit: true,
mockStart: true,
mockStartErr: errors.New("fake-data-path-start-error"),
mockClose: true,
notNilExpose: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("error starting data path").Result(),
expectedErr: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error",
},
{
name: "Prepare succeeds",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
mockInit: true,
mockStart: true,
notNilExpose: true,
notMockCleanUp: true,
expectDataPath: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "In progress pvr is not handled by the current node",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "In progress pvr is not set as cancel",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "Cancel pvr in progress with empty FSBR",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "Cancel pvr in progress and patch pvr error",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
needErrs: []bool{false, false, true, false},
needCreateFSBR: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectedErr: "error updating PVR velero/pvr-1: Update error",
expectCancelRecord: true,
expectDataPath: true,
},
{
name: "Cancel pvr in progress succeeds",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
needCreateFSBR: true,
mockCancel: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceling).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectDataPath: true,
expectCancelRecord: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
objs := []runtime.Object{daemonSet, node}
ctlObj := []client.Object{}
if test.targetPod != nil {
ctlObj = append(ctlObj, test.targetPod)
}
r, err := initPodVolumeRestoreReconciler(objs, ctlObj, test.needErrs...)
require.NoError(t, err)
if !test.notCreatePVR {
err = r.client.Create(context.Background(), test.pvr)
require.NoError(t, err)
}
if test.needDelete {
err = r.client.Delete(context.Background(), test.pvr)
require.NoError(t, err)
}
if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
r.dataPathMgr = datapath.NewManager(1)
}
if test.sportTime != nil {
r.cancelledPVR[test.pvr.Name] = test.sportTime.Time
}
funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore
if test.needExclusiveUpdateError != nil {
funcExclusiveUpdatePodVolumeRestore = func(context.Context, kbclient.Client, *velerov1api.PodVolumeRestore, func(*velerov1api.PodVolumeRestore)) (bool, error) {
return false, test.needExclusiveUpdateError
}
}
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t)
if test.mockInit {
asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr)
}
if test.mockStart {
asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr)
}
if test.mockCancel {
asyncBR.On("Cancel").Return()
}
if test.mockClose {
asyncBR.On("Close", mock.Anything).Return()
}
return asyncBR
}
if test.mockExposeErr != nil || test.isGetExposeErr || test.isGetExposeNil || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose {
if test.isNilExposer {
r.exposer = nil
} else {
r.exposer = func() exposer.PodVolumeExposer {
ep := exposermockes.NewPodVolumeExposer(t)
if test.mockExposeErr != nil {
if boolptr.IsSetToTrue(test.mockExposeErr) {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
} else {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
}
} else if test.notNilExpose {
hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1api.Volume{Name: "test-pvc"}).Result()
hostingPod.ObjectMeta.SetUID("test-uid")
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil)
} else if test.isGetExposeErr {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get PVR exposer"))
} else if test.isGetExposeNil {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
} else if test.isPeekExposeErr {
ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error"))
}
if !test.notMockCleanUp {
ep.On("CleanUp", mock.Anything, mock.Anything).Return()
}
return ep
}()
}
}
if test.needCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.pvr.Name); fsBR == nil {
_, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeRestore, test.pvr.Name, pVBRRequestor,
velerov1api.DefaultNamespace, "", "", datapath.Callbacks{OnCancelled: r.OnDataPathCancelled}, false, velerotest.NewLogger())
require.NoError(t, err)
}
}
actualResult, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: velerov1api.DefaultNamespace,
Name: test.pvr.Name,
},
})
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
}
if test.expectedResult != nil {
assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue)
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}
if test.expected != nil || test.expectDeleted {
pvr := velerov1api.PodVolumeRestore{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.pvr.Name,
Namespace: test.pvr.Namespace,
}, &pvr)
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
require.NoError(t, err)
assert.Equal(t, test.expected.Status.Phase, pvr.Status.Phase)
assert.Contains(t, pvr.Status.Message, test.expected.Status.Message)
assert.Equal(t, test.expected.Finalizers, pvr.Finalizers)
assert.Equal(t, test.expected.Spec.Cancel, pvr.Spec.Cancel)
}
}
if !test.expectDataPath {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.pvr.Name))
} else {
assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.pvr.Name))
}
if test.expectCancelRecord {
assert.Contains(t, r.cancelledPVR, test.pvr.Name)
} else {
assert.Empty(t, r.cancelledPVR)
}
})
}
}
func TestOnPodVolumeRestoreFailed(t *testing.T) {
for _, getErr := range []bool{true, false} {
ctx := context.TODO()
needErrs := []bool{getErr, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...)
require.NoError(t, err)
pvr := pvrBuilder().Result()
namespace := pvr.Namespace
pvrName := pvr.Name
assert.NoError(t, r.client.Create(ctx, pvr))
r.OnDataPathFailed(ctx, namespace, pvrName, fmt.Errorf("Failed to handle %v", pvrName))
updatedPVR := &velerov1api.PodVolumeRestore{}
if getErr {
assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.NotEqual(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase)
assert.True(t, updatedPVR.Status.StartTimestamp.IsZero())
} else {
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.Equal(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase)
assert.True(t, updatedPVR.Status.StartTimestamp.IsZero())
}
}
}
func TestOnPodVolumeRestoreCancelled(t *testing.T) {
for _, getErr := range []bool{true, false} {
ctx := context.TODO()
needErrs := []bool{getErr, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, nil, needErrs...)
require.NoError(t, err)
pvr := pvrBuilder().Result()
namespace := pvr.Namespace
pvrName := pvr.Name
assert.NoError(t, r.client.Create(ctx, pvr))
r.OnDataPathCancelled(ctx, namespace, pvrName)
updatedPVR := &velerov1api.PodVolumeRestore{}
if getErr {
assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.NotEqual(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase)
assert.True(t, updatedPVR.Status.StartTimestamp.IsZero())
} else {
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.Equal(t, velerov1api.PodVolumeRestorePhaseCanceled, updatedPVR.Status.Phase)
assert.False(t, updatedPVR.Status.StartTimestamp.IsZero())
assert.False(t, updatedPVR.Status.CompletionTimestamp.IsZero())
}
}
}
func TestOnPodVolumeRestoreCompleted(t *testing.T) {
tests := []struct {
name string
emptyFSBR bool
isGetErr bool
rebindVolumeErr bool
}{
{
name: "PVR complete",
emptyFSBR: false,
isGetErr: false,
rebindVolumeErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
needErrs := []bool{test.isGetErr, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...)
r.exposer = func() exposer.PodVolumeExposer {
ep := exposermockes.NewPodVolumeExposer(t)
ep.On("CleanUp", mock.Anything, mock.Anything).Return()
return ep
}()
require.NoError(t, err)
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result()
namespace := pvr.Namespace
ddName := pvr.Name
assert.NoError(t, r.client.Create(ctx, pvr))
r.OnDataPathCompleted(ctx, namespace, ddName, datapath.Result{})
updatedDD := &velerov1api.PodVolumeRestore{}
if test.isGetErr {
assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, updatedDD))
assert.Equal(t, velerov1api.PodVolumeRestorePhase(""), updatedDD.Status.Phase)
assert.True(t, updatedDD.Status.CompletionTimestamp.IsZero())
} else {
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, updatedDD))
assert.Equal(t, velerov1api.PodVolumeRestorePhaseCompleted, updatedDD.Status.Phase)
assert.False(t, updatedDD.Status.CompletionTimestamp.IsZero())
}
})
}
}
func TestOnPodVolumeRestoreProgress(t *testing.T) {
totalBytes := int64(1024)
bytesDone := int64(512)
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
progress uploader.Progress
needErrs []bool
}{
{
name: "patch in progress phase success",
pvr: pvrBuilder().Result(),
progress: uploader.Progress{
TotalBytes: totalBytes,
BytesDone: bytesDone,
},
},
{
name: "failed to get pvr",
pvr: pvrBuilder().Result(),
needErrs: []bool{true, false, false, false},
},
{
name: "failed to patch pvr",
pvr: pvrBuilder().Result(),
needErrs: []bool{false, false, true, false},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, test.needErrs...)
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{})
}()
pvr := pvrBuilder().Result()
namespace := pvr.Namespace
pvrName := pvr.Name
assert.NoError(t, r.client.Create(context.Background(), pvr))
// Create a Progress object
progress := &uploader.Progress{
TotalBytes: totalBytes,
BytesDone: bytesDone,
}
r.OnDataPathProgress(ctx, namespace, pvrName, progress)
if len(test.needErrs) != 0 && !test.needErrs[0] {
updatedPVR := &velerov1api.PodVolumeRestore{}
assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR))
assert.Equal(t, test.progress.TotalBytes, updatedPVR.Status.Progress.TotalBytes)
assert.Equal(t, test.progress.BytesDone, updatedPVR.Status.Progress.BytesDone)
}
})
}
}
func TestFindPVBForRestorePod(t *testing.T) {
needErrs := []bool{false, false, false, false}
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...)
require.NoError(t, err)
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
pod *corev1api.Pod
checkFunc func(*velerov1api.PodVolumeRestore, []reconcile.Request)
}{
{
name: "find pvr for pod",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)
// Assert that the request contains the correct namespaced name
assert.Equal(t, pvr.Namespace, requests[0].Namespace)
assert.Equal(t, pvr.Name, requests[0].Name)
},
}, {
name: "no selected label found for pod",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Empty(t, requests)
},
}, {
name: "no matched pod",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: "non-existing-pvr"}).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
assert.Empty(t, requests)
},
},
{
name: "pvr not accept",
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Result(),
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
assert.Empty(t, requests)
},
},
}
for _, test := range tests {
ctx := context.Background()
assert.NoError(t, r.client.Create(ctx, test.pod))
assert.NoError(t, r.client.Create(ctx, test.pvr))
// Call the findSnapshotRestoreForPod function
requests := r.findPVRForRestorePod(context.Background(), test.pod)
test.checkFunc(test.pvr, requests)
r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{})
if test.pod != nil {
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
}
}
}
func TestOnPVRPrepareTimeout(t *testing.T) {
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
needErrs []error
expected *velerov1api.PodVolumeRestore
}{
{
name: "update fail",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
expected: pvrBuilder().Result(),
},
{
name: "update interrupted",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
expected: pvrBuilder().Result(),
},
{
name: "succeed",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
expected: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(),
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initPodVolumeRestoreReconcilerWithError(nil, []client.Object{}, test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.pvr)
require.NoError(t, err)
r.onPrepareTimeout(ctx, test.pvr)
pvr := velerov1api.PodVolumeRestore{}
_ = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.pvr.Name,
Namespace: test.pvr.Namespace,
}, &pvr)
assert.Equal(t, test.expected.Status.Phase, pvr.Status.Phase)
}
}
func TestTryCancelPVR(t *testing.T) {
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
needErrs []error
succeeded bool
expectedErr string
}{
{
name: "update fail",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
},
{
name: "cancel by others",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
},
{
name: "succeed",
pvr: pvrBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
succeeded: true,
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initPodVolumeRestoreReconcilerWithError(nil, []client.Object{}, test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.pvr)
require.NoError(t, err)
r.tryCancelPodVolumeRestore(ctx, test.pvr, "")
if test.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectedErr)
}
}
}
func TestUpdatePVRWithRetry(t *testing.T) {
namespacedName := types.NamespacedName{
Name: pvrName,
Namespace: "velero",
}
// Define test cases
testCases := []struct {
Name string
needErrs []bool
noChange bool
ExpectErr bool
}{
{
Name: "SuccessOnFirstAttempt",
},
{
Name: "Error get",
needErrs: []bool{true, false, false, false, false},
ExpectErr: true,
},
{
Name: "Error update",
needErrs: []bool{false, false, true, false, false},
ExpectErr: true,
},
{
Name: "no change",
noChange: true,
needErrs: []bool{false, false, true, false, false},
},
{
Name: "Conflict with error timeout",
needErrs: []bool{false, false, false, false, true},
ExpectErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
ctx, cancelFunc := context.WithTimeout(context.TODO(), time.Second*5)
defer cancelFunc()
r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, tc.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, pvrBuilder().Result())
require.NoError(t, err)
updateFunc := func(pvr *velerov1api.PodVolumeRestore) bool {
if tc.noChange {
return false
}
pvr.Spec.Cancel = true
return true
}
err = UpdatePVRWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc)
if tc.ExpectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@ -0,0 +1,125 @@
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
import (
context "context"
client "sigs.k8s.io/controller-runtime/pkg/client"
exposer "github.com/vmware-tanzu/velero/pkg/exposer"
mock "github.com/stretchr/testify/mock"
time "time"
corev1api "k8s.io/api/core/v1"
)
// PodVolumeExposer is an autogenerated mock type for the PodVolumeExposer type
type PodVolumeExposer struct {
mock.Mock
}
// CleanUp provides a mock function with given fields: _a0, _a1
func (_m *PodVolumeExposer) CleanUp(_a0 context.Context, _a1 corev1api.ObjectReference) {
_m.Called(_a0, _a1)
}
// DiagnoseExpose provides a mock function with given fields: _a0, _a1
func (_m *PodVolumeExposer) DiagnoseExpose(_a0 context.Context, _a1 corev1api.ObjectReference) string {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for DiagnoseExpose")
}
var r0 string
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference) string); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Expose provides a mock function with given fields: _a0, _a1, _a2
func (_m *PodVolumeExposer) Expose(_a0 context.Context, _a1 corev1api.ObjectReference, _a2 exposer.PodVolumeExposeParam) error {
ret := _m.Called(_a0, _a1, _a2)
if len(ret) == 0 {
panic("no return value specified for Expose")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, exposer.PodVolumeExposeParam) error); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetExposed provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *PodVolumeExposer) GetExposed(_a0 context.Context, _a1 corev1api.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
if len(ret) == 0 {
panic("no return value specified for GetExposed")
}
var r0 *exposer.ExposeResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok {
return rf(_a0, _a1, _a2, _a3, _a4)
}
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) *exposer.ExposeResult); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*exposer.ExposeResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) error); ok {
r1 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PeekExposed provides a mock function with given fields: _a0, _a1
func (_m *PodVolumeExposer) PeekExposed(_a0 context.Context, _a1 corev1api.ObjectReference) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for PeekExposed")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewPodVolumeExposer creates a new instance of PodVolumeExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewPodVolumeExposer(t interface {
mock.TestingT
Cleanup(func())
}) *PodVolumeExposer {
mock := &PodVolumeExposer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -98,7 +98,7 @@ func newRestorer(
return
}
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed {
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled {
r.resultsLock.Lock()
defer r.resultsLock.Unlock()
@ -234,7 +234,7 @@ ForEachVolume:
errs = append(errs, errors.New("timed out waiting for all PodVolumeRestores to complete"))
break ForEachVolume
case res := <-resultsChan:
if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed {
if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || res.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled {
errs = append(errs, errors.Errorf("pod volume restore failed: %s", res.Status.Message))
}
tracker.TrackPodVolume(res)