data mover restore expose
Signed-off-by: Lyndon-Li <>pull/6357/head
@ -0,0 +1 @@
Add the code for data mover restore expose
@ -0,0 +1,330 @@
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package exposer
import (
corev1 ""
apierrors ""
metav1 ""
// GenericRestoreExposer is the interfaces for a generic restore exposer
type GenericRestoreExposer interface {
// Expose starts the process to a restore expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
// RebindVolume unexposes the restored PV and rebind it to the target PVC
RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error
// CleanUp cleans up any objects generated during the restore expose
CleanUp(context.Context, corev1.ObjectReference)
// NewGenericRestoreExposer creates a new instance of generic restore exposer
func NewGenericRestoreExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) GenericRestoreExposer {
return &genericRestoreExposer{
kubeClient: kubeClient,
log: log,
type genericRestoreExposer struct {
kubeClient kubernetes.Interface
log logrus.FieldLogger
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, timeout time.Duration) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
"source namespace": sourceNamespace,
selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout)
if err != nil {
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName)
curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")
restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
curLog.WithField("pod name", restorePod.Name).Info("Restore pod is created")
defer func() {
if err != nil {
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePod.Name, restorePod.Namespace, curLog)
restorePVC, err := e.createRestorePVC(ctx, ownerObject, targetPVC, selectedNode)
if err != nil {
return errors.Wrap(err, "error to create restore pvc")
curLog.WithField("pvc name", restorePVC.Name).Info("Restore PVC is created")
defer func() {
if err != nil {
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVC.Name, restorePVC.Namespace, curLog)
return nil
func (e *genericRestoreExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"node": nodeName,
pod := &corev1.Pod{}
err := nodeClient.Get(ctx, types.NamespacedName{
Namespace: ownerObject.Namespace,
Name: restorePodName,
}, pod)
if err != nil {
if apierrors.IsNotFound(err) {
curLog.WithField("backup pod", restorePodName).Error("Backup pod is not running in the current node")
return nil, nil
} else {
return nil, errors.Wrapf(err, "error to get backup pod %s", restorePodName)
curLog.WithField("pod", pod.Name).Infof("Restore pod is in running state in node %s", pod.Spec.NodeName)
_, err = kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout)
if err != nil {
return nil, errors.Wrapf(err, "error to wait restore PVC bound, %s", restorePVCName)
curLog.WithField("restore pvc", restorePVCName).Info("Restore PVC is bound")
return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, PVC: restorePVCName}}, nil
func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, e.log)
func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, timeout time.Duration) error {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
"source namespace": sourceNamespace,
targetPVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(sourceNamespace).Get(ctx, targetPVCName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "error to get target PVC %s/%s", sourceNamespace, targetPVCName)
restorePV, err := kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to get PV from restore PVC %s", restorePVCName)
orgReclaim := restorePV.Spec.PersistentVolumeReclaimPolicy
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is retrieved")
retained, err := kube.SetPVReclaimPolicy(ctx, e.kubeClient.CoreV1(), restorePV, corev1.PersistentVolumeReclaimRetain)
if err != nil {
return errors.Wrapf(err, "error to retain PV %s", restorePV.Name)
curLog.WithField("restore PV", restorePV.Name).WithField("retained", (retained != nil)).Info("Restore PV is retained")
defer func() {
if retained != nil {
curLog.WithField("retained PV", retained.Name).Info("Deleting retained PV on error")
kube.DeletePVIfAny(ctx, e.kubeClient.CoreV1(), retained.Name, curLog)
if retained != nil {
restorePV = retained
err = kube.EnsureDeletePod(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to delete restore pod %s", restorePodName)
err = kube.EnsureDeletePVC(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to delete restore PVC %s", restorePVCName)
curLog.WithField("restore PVC", restorePVCName).Info("Restore PVC is deleted")
_, err = kube.RebindPVC(ctx, e.kubeClient.CoreV1(), targetPVC, restorePV.Name)
if err != nil {
return errors.Wrapf(err, "error to rebind target PVC %s/%s to %s", targetPVC.Namespace, targetPVC.Name, restorePV.Name)
curLog.WithField("tartet PVC", fmt.Sprintf("%s/%s", targetPVC.Namespace, targetPVC.Name)).WithField("restore PV", restorePV.Name).Info("Target PVC is rebound to restore PV")
var matchLabel map[string]string
if targetPVC.Spec.Selector != nil {
matchLabel = targetPVC.Spec.Selector.MatchLabels
restorePVName := restorePV.Name
restorePV, err = kube.ResetPVBinding(ctx, e.kubeClient.CoreV1(), restorePV, matchLabel)
if err != nil {
return errors.Wrapf(err, "error to reset binding info for restore PV %s", restorePVName)
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is rebound")
restorePV, err = kube.WaitPVBound(ctx, e.kubeClient.CoreV1(), restorePV.Name, targetPVC.Name, targetPVC.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to wait restore PV bound, restore PV %s", restorePVName)
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is ready")
retained = nil
_, err = kube.SetPVReclaimPolicy(ctx, e.kubeClient.CoreV1(), restorePV, orgReclaim)
if err != nil {
curLog.WithField("restore PV", restorePV.Name).WithError(err).Warn("Restore PV's reclaim policy is not restored")
} else {
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV's reclaim policy is restored")
return nil
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
var gracePeriod int64 = 0
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: restorePodName,
Namespace: ownerObject.Namespace,
OwnerReferences: []metav1.OwnerReference{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
Labels: label,
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Name: restorePodName,
Image: "alpine:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "infinity"},
VolumeMounts: []corev1.VolumeMount{{
Name: restorePVCName,
MountPath: "/" + restorePVCName,
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: []corev1.Volume{{
Name: restorePVCName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: restorePVCName,
NodeName: selectedNode,
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
func (e *genericRestoreExposer) createRestorePVC(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim, selectedNode string) (*corev1.PersistentVolumeClaim, error) {
restorePVCName := ownerObject.Name
pvcObj := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ownerObject.Namespace,
Name: restorePVCName,
Labels: targetPVC.Labels,
Annotations: targetPVC.Annotations,
OwnerReferences: []metav1.OwnerReference{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: targetPVC.Spec.AccessModes,
StorageClassName: targetPVC.Spec.StorageClassName,
VolumeMode: targetPVC.Spec.VolumeMode,
Resources: targetPVC.Spec.Resources,
if selectedNode != "" {
pvcObj.Annotations = map[string]string{
kube.KubeAnnSelectedNode: selectedNode,
return e.kubeClient.CoreV1().PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{})
@ -0,0 +1,376 @@
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package exposer
import (
metav1 ""
velerov1 ""
velerotest ""
corev1api ""
clientTesting ""
func TestRestoreExpose(t *testing.T) {
restore := &velerov1.Restore{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Restore",
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
UID: "fake-uid",
targetPVCObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-target-pvc",
tests := []struct {
name string
kubeClientObj []runtime.Object
ownerRestore *velerov1.Restore
targetPVCName string
sourceNamespace string
kubeReactors []reactor
err string
name: "wait target pvc consumed fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
err: "error to wait target PVC consumed, fake-ns/fake-target-pvc: error to wait for PVC: error to get pvc fake-ns/fake-target-pvc: persistentvolumeclaims \"fake-target-pvc\" not found",
name: "create restore pod fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "create",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
err: "error to create restore pod: fake-create-error",
name: "create restore pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "create",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
err: "error to create restore pvc: fake-create-error",
for _, test := range tests {
t.Run(, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
exposer := genericRestoreExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
var ownerObject corev1api.ObjectReference
if test.ownerRestore != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerRestore.Kind,
Namespace: test.ownerRestore.Namespace,
Name: test.ownerRestore.Name,
UID: test.ownerRestore.UID,
APIVersion: test.ownerRestore.APIVersion,
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, time.Millisecond)
assert.EqualError(t, err, test.err)
func TestRebindVolume(t *testing.T) {
restore := &velerov1.Restore{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Restore",
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
UID: "fake-uid",
targetPVCObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-target-pvc",
restorePVCObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
Spec: corev1api.PersistentVolumeClaimSpec{
VolumeName: "fake-restore-pv",
restorePVObj := &corev1api.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-restore-pv",
Spec: corev1api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1api.PersistentVolumeReclaimDelete,
restorePod := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
hookCount := 0
tests := []struct {
name string
kubeClientObj []runtime.Object
ownerRestore *velerov1.Restore
targetPVCName string
sourceNamespace string
kubeReactors []reactor
err string
name: "get target pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
err: "error to get target PVC fake-ns/fake-target-pvc: persistentvolumeclaims \"fake-target-pvc\" not found",
name: "wait restore pvc bound fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
err: "error to get PV from restore PVC fake-restore: error to wait for rediness of PVC: error to get pvc velero/fake-restore: persistentvolumeclaims \"fake-restore\" not found",
name: "retain target pv fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "patch",
resource: "persistentvolumes",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-patch-error")
err: "error to retain PV fake-restore-pv: error patching PV: fake-patch-error",
name: "delete restore pod fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "delete",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-delete-error")
err: "error to delete restore pod fake-restore: error to delete pod fake-restore: fake-delete-error",
name: "delete restore pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "delete",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-delete-error")
err: "error to delete restore PVC fake-restore: error to delete pvc fake-restore: fake-delete-error",
name: "rebind target pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "patch",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-patch-error")
err: "error to rebind target PVC fake-ns/fake-target-pvc to fake-restore-pv: error patching PVC: fake-patch-error",
name: "reset pv binding fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
kubeReactors: []reactor{
verb: "patch",
resource: "persistentvolumes",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
if hookCount == 0 {
return false, nil, nil
} else {
return true, nil, errors.New("fake-patch-error")
err: "error to reset binding info for restore PV fake-restore-pv: error patching PV: fake-patch-error",
name: "wait restore PV bound fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
err: "error to wait restore PV bound, restore PV fake-restore-pv: error to wait for bound of PV: timed out waiting for the condition",
for _, test := range tests {
t.Run(, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
exposer := genericRestoreExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
var ownerObject corev1api.ObjectReference
if test.ownerRestore != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerRestore.Kind,
Namespace: test.ownerRestore.Namespace,
Name: test.ownerRestore.Name,
UID: test.ownerRestore.UID,
APIVersion: test.ownerRestore.APIVersion,
hookCount = 0
err := exposer.RebindVolume(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, time.Millisecond)
assert.EqualError(t, err, test.err)
@ -17,12 +17,14 @@ package kube
import (
corev1api ""
apierrors ""
metav1 ""
corev1client ""
@ -81,3 +83,30 @@ func DeletePodIfAny(ctx context.Context, podGetter corev1client.CoreV1Interface,
// EnsureDeletePod asserts the existence of a pod by name, deletes it and waits for its disappearance and returns errors on any failure
func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, timeout time.Duration) error {
err := podGetter.Pods(namespace).Delete(ctx, pod, metav1.DeleteOptions{})
if err != nil {
return errors.Wrapf(err, "error to delete pod %s", pod)
err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
_, err := podGetter.Pods(namespace).Get(ctx, pod, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
return false, errors.Wrapf(err, "error to get pod %s", pod)
return false, nil
if err != nil {
return errors.Wrapf(err, "error to assure pod is deleted, %s", pod)
return nil
@ -0,0 +1,93 @@
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package kube
import (
corev1api ""
metav1 ""
clientTesting ""
func TestEnsureDeletePod(t *testing.T) {
podObject := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-pod",
tests := []struct {
name string
clientObj []runtime.Object
podName string
namespace string
reactors []reactor
err string
name: "delete fail",
podName: "fake-pod",
namespace: "fake-ns",
err: "error to delete pod fake-pod: pods \"fake-pod\" not found",
name: "wait fail",
podName: "fake-pod",
namespace: "fake-ns",
clientObj: []runtime.Object{podObject},
reactors: []reactor{
verb: "get",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
err: "error to assure pod is deleted, fake-pod: error to get pod fake-pod: fake-get-error",
for _, test := range tests {
t.Run(, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.clientObj...)
for _, reactor := range test.reactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
var kubeClient kubernetes.Interface = fakeKubeClient
err := EnsureDeletePod(context.Background(), kubeClient.CoreV1(), test.podName, test.namespace, time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
@ -18,17 +18,23 @@ package kube
import (
jsonpatch ""
corev1api ""
apierrors ""
metav1 ""
corev1client ""
storagev1api ""
storagev1 ""
const (
@ -77,3 +83,222 @@ func WaitPVCBound(ctx context.Context, pvcGetter corev1client.CoreV1Interface,
return pv, err
// DeletePVIfAny deletes a PV by name if it exists, and log an error when the deletion fails
func DeletePVIfAny(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvName string, log logrus.FieldLogger) {
err := pvGetter.PersistentVolumes().Delete(ctx, pvName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debugf("Abort deleting PV, it doesn't exist, %s", pvName)
} else {
log.WithError(err).Errorf("Failed to delete PV %s", pvName)
// EnsureDeletePVC asserts the existence of a PVC by name, deletes it and waits for its disappearance and returns errors on any failure
func EnsureDeletePVC(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string, timeout time.Duration) error {
err := pvcGetter.PersistentVolumeClaims(namespace).Delete(ctx, pvc, metav1.DeleteOptions{})
if err != nil {
return errors.Wrapf(err, "error to delete pvc %s", pvc)
err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
_, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
return false, errors.Wrapf(err, "error to get pvc %s", pvc)
return false, nil
if err != nil {
return errors.Wrapf(err, "error to retrieve pvc info for %s", pvc)
return nil
// RebindPVC rebinds a PVC by modifying its VolumeName to the specific PV
func RebindPVC(ctx context.Context, pvcGetter corev1client.CoreV1Interface,
pvc *corev1api.PersistentVolumeClaim, pv string) (*corev1api.PersistentVolumeClaim, error) {
origBytes, err := json.Marshal(pvc)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original PVC")
updated := pvc.DeepCopy()
updated.Spec.VolumeName = pv
delete(updated.Annotations, KubeAnnBindCompleted)
delete(updated.Annotations, KubeAnnBoundByController)
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated PV")
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PV")
updated, err = pvcGetter.PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PVC")
return updated, nil
// ResetPVBinding resets the binding info of a PV and adds the required labels so as to make it ready for binding
func ResetPVBinding(ctx context.Context, pvGetter corev1client.CoreV1Interface, pv *corev1api.PersistentVolume, labels map[string]string) (*corev1api.PersistentVolume, error) {
origBytes, err := json.Marshal(pv)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original PV")
updated := pv.DeepCopy()
updated.Spec.ClaimRef = nil
delete(updated.Annotations, KubeAnnBoundByController)
if labels != nil {
if updated.Labels == nil {
updated.Labels = make(map[string]string)
for k, v := range labels {
if _, ok := updated.Labels[k]; !ok {
updated.Labels[k] = v
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated PV")
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PV")
updated, err = pvGetter.PersistentVolumes().Patch(ctx, pv.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PV")
return updated, nil
// SetPVReclaimPolicy sets the specified reclaim policy to a PV
func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interface, pv *corev1api.PersistentVolume,
policy corev1api.PersistentVolumeReclaimPolicy) (*corev1api.PersistentVolume, error) {
if pv.Spec.PersistentVolumeReclaimPolicy == policy {
return nil, nil
origBytes, err := json.Marshal(pv)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original PV")
updated := pv.DeepCopy()
updated.Spec.PersistentVolumeReclaimPolicy = policy
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated PV")
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PV")
updated, err = pvGetter.PersistentVolumes().Patch(ctx, pv.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PV")
return updated, nil
// WaitPVCConsumed waits for a PVC to be consumed by a pod so that the selected node is set by the pod scheduling; or does
// nothing if the consuming doesn't affect the PV provision.
// The latest PVC and the selected node will be returned.
func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string,
storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) {
selectedNode := ""
var updated *corev1api.PersistentVolumeClaim
var storageClass *storagev1api.StorageClass
err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
tmpPVC, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc)
if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
if storageClass != nil {
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
if selectedNode == "" {
return false, nil
updated = tmpPVC
return true, nil
if err != nil {
return "", nil, errors.Wrap(err, "error to wait for PVC")
return selectedNode, updated, err
// WaitPVBound wait for binding of a PV specified by name and returns the bound PV object
func WaitPVBound(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvName string, pvcName string, pvcNamespace string, timeout time.Duration) (*corev1api.PersistentVolume, error) {
var updated *corev1api.PersistentVolume
err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
tmpPV, err := pvGetter.PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get pv %s", pvName))
if tmpPV.Spec.ClaimRef == nil {
return false, nil
if tmpPV.Spec.ClaimRef.Name != pvcName {
return false, nil
if tmpPV.Spec.ClaimRef.Namespace != pvcNamespace {
return false, nil
updated = tmpPV
return true, nil
if err != nil {
return nil, errors.Wrap(err, "error to wait for bound of PV")
} else {
return updated, nil
@ -28,6 +28,7 @@ import (
corev1api ""
storagev1api ""
clientTesting ""
@ -129,3 +130,157 @@ func TestWaitPVCBound(t *testing.T) {
func TestWaitPVCConsumed(t *testing.T) {
storageClass := "fake-storage-class"
bindModeImmediate := storagev1api.VolumeBindingImmediate
bindModeWait := storagev1api.VolumeBindingWaitForFirstConsumer
pvcObject := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc-1",
pvcObjectWithSC := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc-2",
Spec: corev1api.PersistentVolumeClaimSpec{
StorageClassName: &storageClass,
scObjWithoutBindMode := &storagev1api.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-storage-class",
scObjWaitBind := &storagev1api.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-storage-class",
VolumeBindingMode: &bindModeWait,
scObjWithImmidateBinding := &storagev1api.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-storage-class",
VolumeBindingMode: &bindModeImmediate,
pvcObjectWithSCAndAnno := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc-3",
Annotations: map[string]string{"": "fake-node-1"},
Spec: corev1api.PersistentVolumeClaimSpec{
StorageClassName: &storageClass,
tests := []struct {
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expectedPVC *corev1api.PersistentVolumeClaim
selectedNode string
err string
name: "get pvc error",
pvcName: "fake-pvc",
pvcNamespace: "fake-namespace",
err: "error to wait for PVC: error to get pvc fake-namespace/fake-pvc: persistentvolumeclaims \"fake-pvc\" not found",
name: "success when no sc",
pvcName: "fake-pvc-1",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
expectedPVC: pvcObject,
name: "get sc fail",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
err: "error to wait for PVC: error to get storage class fake-storage-class: \"fake-storage-class\" not found",
name: "success on sc without binding mode",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
expectedPVC: pvcObjectWithSC,
name: "success on sc without immediate binding mode",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
expectedPVC: pvcObjectWithSC,
name: "pvc annotation miss",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
err: "error to wait for PVC: timed out waiting for the condition",
name: "success on sc without wait binding mode",
pvcName: "fake-pvc-3",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
expectedPVC: pvcObjectWithSCAndAnno,
selectedNode: "fake-node-1",
for _, test := range tests {
t.Run(, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
var kubeClient kubernetes.Interface = fakeKubeClient
selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expectedPVC, pvc)
assert.Equal(t, test.selectedNode, selectedNode)
@ -47,6 +47,7 @@ const (
KubeAnnBoundByController = ""
KubeAnnDynamicallyProvisioned = ""
KubeAnnMigratedTo = ""
KubeAnnSelectedNode = ""
// NamespaceAndName returns a string in the format <namespace>/<name>
Reference in New Issue