Merge pull request #8550 from Lyndon-Li/restore-pvc-ignore-wait-for-first-consumer
Issue 8044: generic restore - allow to ignore delay binding for WaitForFirstConsumerpull/8385/head
commit
be5f56ab18
|
@ -0,0 +1 @@
|
|||
Fix issue #8044, allow users to ignore delay binding the restorePVC of data mover when it is in WaitForFirstConsumer mode
|
|
@ -353,7 +353,13 @@ func (s *nodeAgentServer) run() {
|
|||
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
|
||||
}
|
||||
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
var restorePVCConfig nodeagent.RestorePVC
|
||||
if s.dataPathConfigs != nil && s.dataPathConfigs.RestorePVCConfig != nil {
|
||||
restorePVCConfig = *s.dataPathConfigs.RestorePVCConfig
|
||||
s.logger.Infof("Using customized restorePVC config %v", restorePVCConfig)
|
||||
}
|
||||
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, restorePVCConfig, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data download controller")
|
||||
}
|
||||
|
|
|
@ -64,13 +64,15 @@ type DataDownloadReconciler struct {
|
|||
restoreExposer exposer.GenericRestoreExposer
|
||||
nodeName string
|
||||
dataPathMgr *datapath.Manager
|
||||
restorePVCConfig nodeagent.RestorePVC
|
||||
podResources v1.ResourceRequirements
|
||||
preparingTimeout time.Duration
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
|
||||
podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
|
||||
restorePVCConfig nodeagent.RestorePVC, podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration,
|
||||
logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
|
||||
return &DataDownloadReconciler{
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
|
@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl
|
|||
Clock: &clock.RealClock{},
|
||||
nodeName: nodeName,
|
||||
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
|
||||
restorePVCConfig: restorePVCConfig,
|
||||
dataPathMgr: dataPathMgr,
|
||||
podResources: podResources,
|
||||
preparingTimeout: preparingTimeout,
|
||||
|
@ -194,7 +197,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
|||
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
||||
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
|
||||
// And then only the controller who is in the same node could do the rest work.
|
||||
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration)
|
||||
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposer.GenericRestoreExposeParam{
|
||||
TargetPVCName: dd.Spec.TargetVolume.PVC,
|
||||
SourceNamespace: dd.Spec.TargetVolume.Namespace,
|
||||
HostingPodLabels: hostingPodLabels,
|
||||
Resources: r.podResources,
|
||||
ExposeTimeout: dd.Spec.OperationTimeout.Duration,
|
||||
RestorePVCConfig: r.restorePVCConfig,
|
||||
})
|
||||
if err != nil {
|
||||
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
|
|
|
@ -50,6 +50,7 @@ import (
|
|||
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
|
@ -140,7 +141,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
|
|||
|
||||
dataPathMgr := datapath.NewManager(1)
|
||||
|
||||
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nodeagent.RestorePVC{}, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func TestDataDownloadReconcile(t *testing.T) {
|
||||
|
@ -959,7 +960,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler,
|
|||
return dt.resumeErr
|
||||
}
|
||||
|
||||
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error {
|
||||
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, exposer.GenericRestoreExposeParam) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -35,10 +35,31 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
// GenericRestoreExposeParam define the input param for Generic Restore Expose
|
||||
type GenericRestoreExposeParam struct {
|
||||
// TargetPVCName is the target volume name to be restored
|
||||
TargetPVCName string
|
||||
|
||||
// SourceNamespace is the original namespace of the volume that the snapshot is taken for
|
||||
SourceNamespace string
|
||||
|
||||
// HostingPodLabels is the labels that are going to apply to the hosting pod
|
||||
HostingPodLabels map[string]string
|
||||
|
||||
// Resources defines the resource requirements of the hosting pod
|
||||
Resources corev1.ResourceRequirements
|
||||
|
||||
// ExposeTimeout specifies the timeout for the entire expose process
|
||||
ExposeTimeout time.Duration
|
||||
|
||||
// RestorePVCConfig is the config for restorePVC (intermediate PVC) of generic restore
|
||||
RestorePVCConfig nodeagent.RestorePVC
|
||||
}
|
||||
|
||||
// GenericRestoreExposer is the interfaces for a generic restore exposer
|
||||
type GenericRestoreExposer interface {
|
||||
// Expose starts the process to a restore expose, the expose process may take long time
|
||||
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error
|
||||
Expose(context.Context, corev1.ObjectReference, GenericRestoreExposeParam) error
|
||||
|
||||
// GetExposed polls the status of the expose.
|
||||
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
|
||||
|
@ -74,25 +95,25 @@ type genericRestoreExposer struct {
|
|||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error {
|
||||
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param GenericRestoreExposeParam) error {
|
||||
curLog := e.log.WithFields(logrus.Fields{
|
||||
"owner": ownerObject.Name,
|
||||
"target PVC": targetPVCName,
|
||||
"source namespace": sourceNamespace,
|
||||
"target PVC": param.TargetPVCName,
|
||||
"source namespace": param.SourceNamespace,
|
||||
})
|
||||
|
||||
selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout)
|
||||
selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), param.TargetPVCName, param.SourceNamespace, e.kubeClient.StorageV1(), param.ExposeTimeout, param.RestorePVCConfig.IgnoreDelayBinding)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName)
|
||||
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", param.SourceNamespace, param.TargetPVCName)
|
||||
}
|
||||
|
||||
curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")
|
||||
curLog.WithField("target PVC", param.TargetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")
|
||||
|
||||
if kube.IsPVCBound(targetPVC) {
|
||||
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
|
||||
return errors.Errorf("Target PVC %s/%s has already been bound, abort", param.SourceNamespace, param.TargetPVCName)
|
||||
}
|
||||
|
||||
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources)
|
||||
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, param.ExposeTimeout, param.HostingPodLabels, selectedNode, param.Resources)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to create restore pod")
|
||||
}
|
||||
|
|
|
@ -180,7 +180,12 @@ func TestRestoreExpose(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond)
|
||||
err := exposer.Expose(context.Background(), ownerObject, GenericRestoreExposeParam{
|
||||
TargetPVCName: test.targetPVCName,
|
||||
SourceNamespace: test.sourceNamespace,
|
||||
HostingPodLabels: map[string]string{},
|
||||
Resources: corev1.ResourceRequirements{},
|
||||
ExposeTimeout: time.Millisecond})
|
||||
assert.EqualError(t, err, test.err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -44,17 +44,17 @@ func (_m *GenericRestoreExposer) DiagnoseExpose(_a0 context.Context, _a1 v1.Obje
|
|||
return r0
|
||||
}
|
||||
|
||||
// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6
|
||||
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 v1.ResourceRequirements, _a6 time.Duration) error {
|
||||
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
|
||||
// Expose provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 exposer.GenericRestoreExposeParam) error {
|
||||
ret := _m.Called(_a0, _a1, _a2)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for Expose")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, v1.ResourceRequirements, time.Duration) error); ok {
|
||||
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, exposer.GenericRestoreExposeParam) error); ok {
|
||||
r0 = rf(_a0, _a1, _a2)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
|
|
@ -81,6 +81,11 @@ type BackupPVC struct {
|
|||
SPCNoRelabeling bool `json:"spcNoRelabeling,omitempty"`
|
||||
}
|
||||
|
||||
type RestorePVC struct {
|
||||
// IgnoreDelayBinding indicates to ignore delay binding the restorePVC when it is in WaitForFirstConsumer mode
|
||||
IgnoreDelayBinding bool `json:"ignoreDelayBinding,omitempty"`
|
||||
}
|
||||
|
||||
type Configs struct {
|
||||
// LoadConcurrency is the config for data path load concurrency per node.
|
||||
LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"`
|
||||
|
@ -91,6 +96,9 @@ type Configs struct {
|
|||
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
|
||||
BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"`
|
||||
|
||||
// RestoreVCConfig is the config for restorePVC (intermediate PVC) of generic restore
|
||||
RestorePVCConfig *RestorePVC `json:"restorePVC,omitempty"`
|
||||
|
||||
// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
|
||||
PodResources *kube.PodResources `json:"podResources,omitempty"`
|
||||
}
|
||||
|
|
|
@ -271,7 +271,7 @@ func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interfa
|
|||
// nothing if the consuming doesn't affect the PV provision.
|
||||
// The latest PVC and the selected node will be returned.
|
||||
func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string,
|
||||
storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) {
|
||||
storageClient storagev1.StorageV1Interface, timeout time.Duration, ignoreConsume bool) (string, *corev1api.PersistentVolumeClaim, error) {
|
||||
selectedNode := ""
|
||||
var updated *corev1api.PersistentVolumeClaim
|
||||
var storageClass *storagev1api.StorageClass
|
||||
|
@ -282,18 +282,20 @@ func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface
|
|||
return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc)
|
||||
}
|
||||
|
||||
if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
|
||||
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
|
||||
if !ignoreConsume {
|
||||
if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
|
||||
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if storageClass != nil {
|
||||
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
|
||||
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
|
||||
if selectedNode == "" {
|
||||
return false, nil
|
||||
if storageClass != nil {
|
||||
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
|
||||
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
|
||||
if selectedNode == "" {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,14 +189,15 @@ func TestWaitPVCConsumed(t *testing.T) {
|
|||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
pvcName string
|
||||
pvcNamespace string
|
||||
kubeClientObj []runtime.Object
|
||||
kubeReactors []reactor
|
||||
expectedPVC *corev1api.PersistentVolumeClaim
|
||||
selectedNode string
|
||||
err string
|
||||
name string
|
||||
pvcName string
|
||||
pvcNamespace string
|
||||
kubeClientObj []runtime.Object
|
||||
kubeReactors []reactor
|
||||
expectedPVC *corev1api.PersistentVolumeClaim
|
||||
selectedNode string
|
||||
ignoreWaitForFirstConsumer bool
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "get pvc error",
|
||||
|
@ -213,6 +214,16 @@ func TestWaitPVCConsumed(t *testing.T) {
|
|||
},
|
||||
expectedPVC: pvcObject,
|
||||
},
|
||||
{
|
||||
name: "success when ignore wait for first consumer",
|
||||
pvcName: "fake-pvc-2",
|
||||
pvcNamespace: "fake-namespace",
|
||||
ignoreWaitForFirstConsumer: true,
|
||||
kubeClientObj: []runtime.Object{
|
||||
pvcObjectWithSC,
|
||||
},
|
||||
expectedPVC: pvcObjectWithSC,
|
||||
},
|
||||
{
|
||||
name: "get sc fail",
|
||||
pvcName: "fake-pvc-2",
|
||||
|
@ -275,7 +286,7 @@ func TestWaitPVCConsumed(t *testing.T) {
|
|||
|
||||
var kubeClient kubernetes.Interface = fakeKubeClient
|
||||
|
||||
selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond)
|
||||
selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond, test.ignoreWaitForFirstConsumer)
|
||||
|
||||
if err != nil {
|
||||
assert.EqualError(t, err, test.err)
|
||||
|
|
Loading…
Reference in New Issue