From 2ea6ffb63f592e458ccff15501b93ade004d23e0 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 2 Jun 2023 12:17:05 +0800 Subject: [PATCH] data mover restore expose Signed-off-by: Lyndon-Li --- changelogs/unreleased/6357-Lyndon-Li | 1 + pkg/exposer/generic_restore.go | 330 +++++++++++++++++++++++ pkg/exposer/generic_restore_test.go | 376 +++++++++++++++++++++++++++ pkg/util/kube/pod.go | 29 +++ pkg/util/kube/pod_test.go | 93 +++++++ pkg/util/kube/pvc_pv.go | 225 ++++++++++++++++ pkg/util/kube/pvc_pv_test.go | 155 +++++++++++ pkg/util/kube/utils.go | 1 + 8 files changed, 1210 insertions(+) create mode 100644 changelogs/unreleased/6357-Lyndon-Li create mode 100644 pkg/exposer/generic_restore.go create mode 100644 pkg/exposer/generic_restore_test.go create mode 100644 pkg/util/kube/pod_test.go diff --git a/changelogs/unreleased/6357-Lyndon-Li b/changelogs/unreleased/6357-Lyndon-Li new file mode 100644 index 000000000..ce2cb71b6 --- /dev/null +++ b/changelogs/unreleased/6357-Lyndon-Li @@ -0,0 +1 @@ +Add the code for data mover restore expose \ No newline at end of file diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go new file mode 100644 index 000000000..c5c693a2f --- /dev/null +++ b/pkg/exposer/generic_restore.go @@ -0,0 +1,330 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +// GenericRestoreExposer is the interfaces for a generic restore exposer +type GenericRestoreExposer interface { + // Expose starts the process to a restore expose, the expose process may take long time + Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error + + // GetExposed polls the status of the expose. + // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. + // Otherwise, it returns nil as the expose result without an error. + GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) + + // RebindVolume unexposes the restored PV and rebind it to the target PVC + RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error + + // CleanUp cleans up any objects generated during the restore expose + CleanUp(context.Context, corev1.ObjectReference) +} + +// NewGenericRestoreExposer creates a new instance of generic restore exposer +func NewGenericRestoreExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) GenericRestoreExposer { + return &genericRestoreExposer{ + kubeClient: kubeClient, + log: log, + } +} + +type genericRestoreExposer struct { + kubeClient kubernetes.Interface + log logrus.FieldLogger +} + +func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, timeout time.Duration) error { + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "target PVC": targetPVCName, + "source namespace": sourceNamespace, + }) + + selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout) + if err != nil { + return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName) + } + + curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed") + + restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode) + if err != nil { + return errors.Wrapf(err, "error to create restore pod") + } + + curLog.WithField("pod name", restorePod.Name).Info("Restore pod is created") + + defer func() { + if err != nil { + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePod.Name, restorePod.Namespace, curLog) + } + }() + + restorePVC, err := e.createRestorePVC(ctx, ownerObject, targetPVC, selectedNode) + if err != nil { + return errors.Wrap(err, "error to create restore pvc") + } + + curLog.WithField("pvc name", restorePVC.Name).Info("Restore PVC is created") + + defer func() { + if err != nil { + kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVC.Name, restorePVC.Namespace, curLog) + } + }() + + return nil +} + +func (e *genericRestoreExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) { + restorePodName := ownerObject.Name + restorePVCName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "node": nodeName, + }) + + pod := &corev1.Pod{} + err := nodeClient.Get(ctx, types.NamespacedName{ + Namespace: ownerObject.Namespace, + Name: restorePodName, + }, pod) + if err != nil { + if apierrors.IsNotFound(err) { + curLog.WithField("backup pod", restorePodName).Error("Backup pod is not running in the current node") + return nil, nil + } else { + return nil, errors.Wrapf(err, "error to get backup pod %s", restorePodName) + } + } + + curLog.WithField("pod", pod.Name).Infof("Restore pod is in running state in node %s", pod.Spec.NodeName) + + _, err = kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout) + if err != nil { + return nil, errors.Wrapf(err, "error to wait restore PVC bound, %s", restorePVCName) + } + + curLog.WithField("restore pvc", restorePVCName).Info("Restore PVC is bound") + + return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, PVC: restorePVCName}}, nil +} + +func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) { + restorePodName := ownerObject.Name + restorePVCName := ownerObject.Name + + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) + kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, e.log) +} + +func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, timeout time.Duration) error { + restorePodName := ownerObject.Name + restorePVCName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "target PVC": targetPVCName, + "source namespace": sourceNamespace, + }) + + targetPVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(sourceNamespace).Get(ctx, targetPVCName, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "error to get target PVC %s/%s", sourceNamespace, targetPVCName) + } + + restorePV, err := kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout) + if err != nil { + return errors.Wrapf(err, "error to get PV from restore PVC %s", restorePVCName) + } + + orgReclaim := restorePV.Spec.PersistentVolumeReclaimPolicy + + curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is retrieved") + + retained, err := kube.SetPVReclaimPolicy(ctx, e.kubeClient.CoreV1(), restorePV, corev1.PersistentVolumeReclaimRetain) + if err != nil { + return errors.Wrapf(err, "error to retain PV %s", restorePV.Name) + } + + curLog.WithField("restore PV", restorePV.Name).WithField("retained", (retained != nil)).Info("Restore PV is retained") + + defer func() { + if retained != nil { + curLog.WithField("retained PV", retained.Name).Info("Deleting retained PV on error") + kube.DeletePVIfAny(ctx, e.kubeClient.CoreV1(), retained.Name, curLog) + } + }() + + if retained != nil { + restorePV = retained + } + + err = kube.EnsureDeletePod(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, timeout) + if err != nil { + return errors.Wrapf(err, "error to delete restore pod %s", restorePodName) + } + + err = kube.EnsureDeletePVC(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout) + if err != nil { + return errors.Wrapf(err, "error to delete restore PVC %s", restorePVCName) + } + + curLog.WithField("restore PVC", restorePVCName).Info("Restore PVC is deleted") + + _, err = kube.RebindPVC(ctx, e.kubeClient.CoreV1(), targetPVC, restorePV.Name) + if err != nil { + return errors.Wrapf(err, "error to rebind target PVC %s/%s to %s", targetPVC.Namespace, targetPVC.Name, restorePV.Name) + } + + curLog.WithField("tartet PVC", fmt.Sprintf("%s/%s", targetPVC.Namespace, targetPVC.Name)).WithField("restore PV", restorePV.Name).Info("Target PVC is rebound to restore PV") + + var matchLabel map[string]string + if targetPVC.Spec.Selector != nil { + matchLabel = targetPVC.Spec.Selector.MatchLabels + } + + restorePVName := restorePV.Name + restorePV, err = kube.ResetPVBinding(ctx, e.kubeClient.CoreV1(), restorePV, matchLabel) + if err != nil { + return errors.Wrapf(err, "error to reset binding info for restore PV %s", restorePVName) + } + + curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is rebound") + + restorePV, err = kube.WaitPVBound(ctx, e.kubeClient.CoreV1(), restorePV.Name, targetPVC.Name, targetPVC.Namespace, timeout) + if err != nil { + return errors.Wrapf(err, "error to wait restore PV bound, restore PV %s", restorePVName) + } + + curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is ready") + + retained = nil + + _, err = kube.SetPVReclaimPolicy(ctx, e.kubeClient.CoreV1(), restorePV, orgReclaim) + if err != nil { + curLog.WithField("restore PV", restorePV.Name).WithError(err).Warn("Restore PV's reclaim policy is not restored") + } else { + curLog.WithField("restore PV", restorePV.Name).Info("Restore PV's reclaim policy is restored") + } + + return nil +} + +func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) { + restorePodName := ownerObject.Name + restorePVCName := ownerObject.Name + + var gracePeriod int64 = 0 + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: restorePodName, + Namespace: ownerObject.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + Labels: label, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: restorePodName, + Image: "alpine:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"sleep", "infinity"}, + VolumeMounts: []corev1.VolumeMount{{ + Name: restorePVCName, + MountPath: "/" + restorePVCName, + }}, + }, + }, + TerminationGracePeriodSeconds: &gracePeriod, + Volumes: []corev1.Volume{{ + Name: restorePVCName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: restorePVCName, + }, + }, + }}, + NodeName: selectedNode, + }, + } + + return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) +} + +func (e *genericRestoreExposer) createRestorePVC(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim, selectedNode string) (*corev1.PersistentVolumeClaim, error) { + restorePVCName := ownerObject.Name + + pvcObj := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ownerObject.Namespace, + Name: restorePVCName, + Labels: targetPVC.Labels, + Annotations: targetPVC.Annotations, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: targetPVC.Spec.AccessModes, + StorageClassName: targetPVC.Spec.StorageClassName, + VolumeMode: targetPVC.Spec.VolumeMode, + Resources: targetPVC.Spec.Resources, + }, + } + + if selectedNode != "" { + pvcObj.Annotations = map[string]string{ + kube.KubeAnnSelectedNode: selectedNode, + } + } + + return e.kubeClient.CoreV1().PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{}) +} diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go new file mode 100644 index 000000000..2b7f4f0a6 --- /dev/null +++ b/pkg/exposer/generic_restore_test.go @@ -0,0 +1,376 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + + corev1api "k8s.io/api/core/v1" + clientTesting "k8s.io/client-go/testing" +) + +func TestRestoreExpose(t *testing.T) { + restore := &velerov1.Restore{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Restore", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-restore", + UID: "fake-uid", + }, + } + + targetPVCObj := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "fake-target-pvc", + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerRestore *velerov1.Restore + targetPVCName string + sourceNamespace string + kubeReactors []reactor + err string + }{ + { + name: "wait target pvc consumed fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + err: "error to wait target PVC consumed, fake-ns/fake-target-pvc: error to wait for PVC: error to get pvc fake-ns/fake-target-pvc: persistentvolumeclaims \"fake-target-pvc\" not found", + }, + { + name: "create restore pod fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + }, + kubeReactors: []reactor{ + { + verb: "create", + resource: "pods", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create restore pod: fake-create-error", + }, + { + name: "create restore pvc fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + }, + kubeReactors: []reactor{ + { + verb: "create", + resource: "persistentvolumeclaims", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create restore pvc: fake-create-error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + exposer := genericRestoreExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerRestore != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerRestore.Kind, + Namespace: test.ownerRestore.Namespace, + Name: test.ownerRestore.Name, + UID: test.ownerRestore.UID, + APIVersion: test.ownerRestore.APIVersion, + } + } + + err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, time.Millisecond) + assert.EqualError(t, err, test.err) + }) + } +} + +func TestRebindVolume(t *testing.T) { + restore := &velerov1.Restore{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Restore", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-restore", + UID: "fake-uid", + }, + } + + targetPVCObj := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "fake-target-pvc", + }, + } + + restorePVCObj := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-restore", + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + VolumeName: "fake-restore-pv", + }, + } + + restorePVObj := &corev1api.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-restore-pv", + }, + Spec: corev1api.PersistentVolumeSpec{ + PersistentVolumeReclaimPolicy: corev1api.PersistentVolumeReclaimDelete, + }, + } + + restorePod := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-restore", + }, + } + + hookCount := 0 + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerRestore *velerov1.Restore + targetPVCName string + sourceNamespace string + kubeReactors []reactor + err string + }{ + { + name: "get target pvc fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + err: "error to get target PVC fake-ns/fake-target-pvc: persistentvolumeclaims \"fake-target-pvc\" not found", + }, + { + name: "wait restore pvc bound fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + }, + err: "error to get PV from restore PVC fake-restore: error to wait for rediness of PVC: error to get pvc velero/fake-restore: persistentvolumeclaims \"fake-restore\" not found", + }, + { + name: "retain target pv fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + restorePVCObj, + restorePVObj, + }, + kubeReactors: []reactor{ + { + verb: "patch", + resource: "persistentvolumes", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-patch-error") + }, + }, + }, + err: "error to retain PV fake-restore-pv: error patching PV: fake-patch-error", + }, + { + name: "delete restore pod fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + restorePVCObj, + restorePVObj, + restorePod, + }, + kubeReactors: []reactor{ + { + verb: "delete", + resource: "pods", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-delete-error") + }, + }, + }, + err: "error to delete restore pod fake-restore: error to delete pod fake-restore: fake-delete-error", + }, + { + name: "delete restore pvc fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + restorePVCObj, + restorePVObj, + restorePod, + }, + kubeReactors: []reactor{ + { + verb: "delete", + resource: "persistentvolumeclaims", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-delete-error") + }, + }, + }, + err: "error to delete restore PVC fake-restore: error to delete pvc fake-restore: fake-delete-error", + }, + { + name: "rebind target pvc fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + restorePVCObj, + restorePVObj, + restorePod, + }, + kubeReactors: []reactor{ + { + verb: "patch", + resource: "persistentvolumeclaims", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-patch-error") + }, + }, + }, + err: "error to rebind target PVC fake-ns/fake-target-pvc to fake-restore-pv: error patching PVC: fake-patch-error", + }, + { + name: "reset pv binding fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + restorePVCObj, + restorePVObj, + restorePod, + }, + kubeReactors: []reactor{ + { + verb: "patch", + resource: "persistentvolumes", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + if hookCount == 0 { + hookCount++ + return false, nil, nil + } else { + return true, nil, errors.New("fake-patch-error") + } + }, + }, + }, + err: "error to reset binding info for restore PV fake-restore-pv: error patching PV: fake-patch-error", + }, + { + name: "wait restore PV bound fail", + targetPVCName: "fake-target-pvc", + sourceNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + restorePVCObj, + restorePVObj, + restorePod, + }, + err: "error to wait restore PV bound, restore PV fake-restore-pv: error to wait for bound of PV: timed out waiting for the condition", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + exposer := genericRestoreExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerRestore != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerRestore.Kind, + Namespace: test.ownerRestore.Namespace, + Name: test.ownerRestore.Name, + UID: test.ownerRestore.UID, + APIVersion: test.ownerRestore.APIVersion, + } + } + + hookCount = 0 + + err := exposer.RebindVolume(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, time.Millisecond) + assert.EqualError(t, err, test.err) + }) + } +} diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index be874a37a..c1464a3d6 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -17,12 +17,14 @@ package kube import ( "context" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -81,3 +83,30 @@ func DeletePodIfAny(ctx context.Context, podGetter corev1client.CoreV1Interface, } } } + +// EnsureDeletePod asserts the existence of a pod by name, deletes it and waits for its disappearance and returns errors on any failure +func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, timeout time.Duration) error { + err := podGetter.Pods(namespace).Delete(ctx, pod, metav1.DeleteOptions{}) + if err != nil { + return errors.Wrapf(err, "error to delete pod %s", pod) + } + + err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + _, err := podGetter.Pods(namespace).Get(ctx, pod, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + + return false, errors.Wrapf(err, "error to get pod %s", pod) + } + + return false, nil + }) + + if err != nil { + return errors.Wrapf(err, "error to assure pod is deleted, %s", pod) + } + + return nil +} diff --git a/pkg/util/kube/pod_test.go b/pkg/util/kube/pod_test.go new file mode 100644 index 000000000..8131b02c7 --- /dev/null +++ b/pkg/util/kube/pod_test.go @@ -0,0 +1,93 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + corev1api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + clientTesting "k8s.io/client-go/testing" +) + +func TestEnsureDeletePod(t *testing.T) { + podObject := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "fake-pod", + }, + } + + tests := []struct { + name string + clientObj []runtime.Object + podName string + namespace string + reactors []reactor + err string + }{ + { + name: "delete fail", + podName: "fake-pod", + namespace: "fake-ns", + err: "error to delete pod fake-pod: pods \"fake-pod\" not found", + }, + { + name: "wait fail", + podName: "fake-pod", + namespace: "fake-ns", + clientObj: []runtime.Object{podObject}, + reactors: []reactor{ + { + verb: "get", + resource: "pods", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + err: "error to assure pod is deleted, fake-pod: error to get pod fake-pod: fake-get-error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.clientObj...) + + for _, reactor := range test.reactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + var kubeClient kubernetes.Interface = fakeKubeClient + + err := EnsureDeletePod(context.Background(), kubeClient.CoreV1(), test.podName, test.namespace, time.Millisecond) + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/util/kube/pvc_pv.go b/pkg/util/kube/pvc_pv.go index f04676d1f..a24525894 100644 --- a/pkg/util/kube/pvc_pv.go +++ b/pkg/util/kube/pvc_pv.go @@ -18,17 +18,23 @@ package kube import ( "context" + "encoding/json" "fmt" "time" + jsonpatch "github.com/evanphx/json-patch" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + + storagev1api "k8s.io/api/storage/v1" + storagev1 "k8s.io/client-go/kubernetes/typed/storage/v1" ) const ( @@ -77,3 +83,222 @@ func WaitPVCBound(ctx context.Context, pvcGetter corev1client.CoreV1Interface, return pv, err } + +// DeletePVIfAny deletes a PV by name if it exists, and log an error when the deletion fails +func DeletePVIfAny(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvName string, log logrus.FieldLogger) { + err := pvGetter.PersistentVolumes().Delete(ctx, pvName, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + log.WithError(err).Debugf("Abort deleting PV, it doesn't exist, %s", pvName) + } else { + log.WithError(err).Errorf("Failed to delete PV %s", pvName) + } + } +} + +// EnsureDeletePVC asserts the existence of a PVC by name, deletes it and waits for its disappearance and returns errors on any failure +func EnsureDeletePVC(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string, timeout time.Duration) error { + err := pvcGetter.PersistentVolumeClaims(namespace).Delete(ctx, pvc, metav1.DeleteOptions{}) + if err != nil { + return errors.Wrapf(err, "error to delete pvc %s", pvc) + } + + err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + _, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + + return false, errors.Wrapf(err, "error to get pvc %s", pvc) + } + + return false, nil + }) + + if err != nil { + return errors.Wrapf(err, "error to retrieve pvc info for %s", pvc) + } + + return nil +} + +// RebindPVC rebinds a PVC by modifying its VolumeName to the specific PV +func RebindPVC(ctx context.Context, pvcGetter corev1client.CoreV1Interface, + pvc *corev1api.PersistentVolumeClaim, pv string) (*corev1api.PersistentVolumeClaim, error) { + origBytes, err := json.Marshal(pvc) + if err != nil { + return nil, errors.Wrap(err, "error marshaling original PVC") + } + + updated := pvc.DeepCopy() + updated.Spec.VolumeName = pv + delete(updated.Annotations, KubeAnnBindCompleted) + delete(updated.Annotations, KubeAnnBoundByController) + + updatedBytes, err := json.Marshal(updated) + if err != nil { + return nil, errors.Wrap(err, "error marshaling updated PV") + } + + patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes) + if err != nil { + return nil, errors.Wrap(err, "error creating json merge patch for PV") + } + + updated, err = pvcGetter.PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error patching PVC") + } + + return updated, nil +} + +// ResetPVBinding resets the binding info of a PV and adds the required labels so as to make it ready for binding +func ResetPVBinding(ctx context.Context, pvGetter corev1client.CoreV1Interface, pv *corev1api.PersistentVolume, labels map[string]string) (*corev1api.PersistentVolume, error) { + origBytes, err := json.Marshal(pv) + if err != nil { + return nil, errors.Wrap(err, "error marshaling original PV") + } + + updated := pv.DeepCopy() + updated.Spec.ClaimRef = nil + delete(updated.Annotations, KubeAnnBoundByController) + + if labels != nil { + if updated.Labels == nil { + updated.Labels = make(map[string]string) + } + + for k, v := range labels { + if _, ok := updated.Labels[k]; !ok { + updated.Labels[k] = v + } + } + } + + updatedBytes, err := json.Marshal(updated) + if err != nil { + return nil, errors.Wrap(err, "error marshaling updated PV") + } + + patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes) + if err != nil { + return nil, errors.Wrap(err, "error creating json merge patch for PV") + } + + updated, err = pvGetter.PersistentVolumes().Patch(ctx, pv.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error patching PV") + } + + return updated, nil +} + +// SetPVReclaimPolicy sets the specified reclaim policy to a PV +func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interface, pv *corev1api.PersistentVolume, + policy corev1api.PersistentVolumeReclaimPolicy) (*corev1api.PersistentVolume, error) { + if pv.Spec.PersistentVolumeReclaimPolicy == policy { + return nil, nil + } + + origBytes, err := json.Marshal(pv) + if err != nil { + return nil, errors.Wrap(err, "error marshaling original PV") + } + + updated := pv.DeepCopy() + updated.Spec.PersistentVolumeReclaimPolicy = policy + + updatedBytes, err := json.Marshal(updated) + if err != nil { + return nil, errors.Wrap(err, "error marshaling updated PV") + } + + patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes) + if err != nil { + return nil, errors.Wrap(err, "error creating json merge patch for PV") + } + + updated, err = pvGetter.PersistentVolumes().Patch(ctx, pv.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error patching PV") + } + + return updated, nil +} + +// WaitPVCConsumed waits for a PVC to be consumed by a pod so that the selected node is set by the pod scheduling; or does +// nothing if the consuming doesn't affect the PV provision. +// The latest PVC and the selected node will be returned. +func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string, + storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) { + selectedNode := "" + var updated *corev1api.PersistentVolumeClaim + var storageClass *storagev1api.StorageClass + err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + tmpPVC, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc) + } + + if tmpPVC.Spec.StorageClassName != nil && storageClass == nil { + storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName) + } + } + + if storageClass != nil { + if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer { + selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode] + if selectedNode == "" { + return false, nil + } + } + } + + updated = tmpPVC + + return true, nil + }) + + if err != nil { + return "", nil, errors.Wrap(err, "error to wait for PVC") + } + + return selectedNode, updated, err +} + +// WaitPVBound wait for binding of a PV specified by name and returns the bound PV object +func WaitPVBound(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvName string, pvcName string, pvcNamespace string, timeout time.Duration) (*corev1api.PersistentVolume, error) { + var updated *corev1api.PersistentVolume + err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) { + tmpPV, err := pvGetter.PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, fmt.Sprintf("failed to get pv %s", pvName)) + } + + if tmpPV.Spec.ClaimRef == nil { + return false, nil + } + + if tmpPV.Spec.ClaimRef.Name != pvcName { + return false, nil + } + + if tmpPV.Spec.ClaimRef.Namespace != pvcNamespace { + return false, nil + } + + updated = tmpPV + + return true, nil + }) + + if err != nil { + return nil, errors.Wrap(err, "error to wait for bound of PV") + } else { + return updated, nil + } +} diff --git a/pkg/util/kube/pvc_pv_test.go b/pkg/util/kube/pvc_pv_test.go index 4713e2e3a..d5e8736d2 100644 --- a/pkg/util/kube/pvc_pv_test.go +++ b/pkg/util/kube/pvc_pv_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/fake" corev1api "k8s.io/api/core/v1" + storagev1api "k8s.io/api/storage/v1" clientTesting "k8s.io/client-go/testing" ) @@ -129,3 +130,157 @@ func TestWaitPVCBound(t *testing.T) { }) } } + +func TestWaitPVCConsumed(t *testing.T) { + storageClass := "fake-storage-class" + bindModeImmediate := storagev1api.VolumeBindingImmediate + bindModeWait := storagev1api.VolumeBindingWaitForFirstConsumer + + pvcObject := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + Name: "fake-pvc-1", + }, + } + + pvcObjectWithSC := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + Name: "fake-pvc-2", + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + StorageClassName: &storageClass, + }, + } + + scObjWithoutBindMode := &storagev1api.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-storage-class", + }, + } + + scObjWaitBind := &storagev1api.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-storage-class", + }, + VolumeBindingMode: &bindModeWait, + } + + scObjWithImmidateBinding := &storagev1api.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-storage-class", + }, + VolumeBindingMode: &bindModeImmediate, + } + + pvcObjectWithSCAndAnno := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-namespace", + Name: "fake-pvc-3", + Annotations: map[string]string{"volume.kubernetes.io/selected-node": "fake-node-1"}, + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + StorageClassName: &storageClass, + }, + } + + tests := []struct { + name string + pvcName string + pvcNamespace string + kubeClientObj []runtime.Object + kubeReactors []reactor + expectedPVC *corev1api.PersistentVolumeClaim + selectedNode string + err string + }{ + { + name: "get pvc error", + pvcName: "fake-pvc", + pvcNamespace: "fake-namespace", + err: "error to wait for PVC: error to get pvc fake-namespace/fake-pvc: persistentvolumeclaims \"fake-pvc\" not found", + }, + { + name: "success when no sc", + pvcName: "fake-pvc-1", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObject, + }, + expectedPVC: pvcObject, + }, + { + name: "get sc fail", + pvcName: "fake-pvc-2", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectWithSC, + }, + err: "error to wait for PVC: error to get storage class fake-storage-class: storageclasses.storage.k8s.io \"fake-storage-class\" not found", + }, + { + name: "success on sc without binding mode", + pvcName: "fake-pvc-2", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectWithSC, + scObjWithoutBindMode, + }, + expectedPVC: pvcObjectWithSC, + }, + { + name: "success on sc without immediate binding mode", + pvcName: "fake-pvc-2", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectWithSC, + scObjWithImmidateBinding, + }, + expectedPVC: pvcObjectWithSC, + }, + { + name: "pvc annotation miss", + pvcName: "fake-pvc-2", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectWithSC, + scObjWaitBind, + }, + err: "error to wait for PVC: timed out waiting for the condition", + }, + { + name: "success on sc without wait binding mode", + pvcName: "fake-pvc-3", + pvcNamespace: "fake-namespace", + kubeClientObj: []runtime.Object{ + pvcObjectWithSCAndAnno, + scObjWaitBind, + }, + expectedPVC: pvcObjectWithSCAndAnno, + selectedNode: "fake-node-1", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + var kubeClient kubernetes.Interface = fakeKubeClient + + selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond) + + if err != nil { + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, test.expectedPVC, pvc) + assert.Equal(t, test.selectedNode, selectedNode) + }) + } +} diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index 35d4f84d4..3d8d4e3ef 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -47,6 +47,7 @@ const ( KubeAnnBoundByController = "pv.kubernetes.io/bound-by-controller" KubeAnnDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by" KubeAnnMigratedTo = "pv.kubernetes.io/migrated-to" + KubeAnnSelectedNode = "volume.kubernetes.io/selected-node" ) // NamespaceAndName returns a string in the format /