From 38c927711a0dc7d4644321b966e000ff699a96a6 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Tue, 20 May 2025 20:39:05 +0800 Subject: [PATCH 1/2] dm controller refactor for cancel Signed-off-by: Lyndon-Li --- changelogs/unreleased/8952-Lyndon-Li | 1 + pkg/builder/data_download_builder.go | 12 + pkg/builder/data_upload_builder.go | 12 + pkg/controller/data_download_controller.go | 394 ++++++------ .../data_download_controller_test.go | 531 ++++++++-------- pkg/controller/data_upload_controller.go | 381 ++++++------ pkg/controller/data_upload_controller_test.go | 567 +++++++++--------- 7 files changed, 1039 insertions(+), 859 deletions(-) create mode 100644 changelogs/unreleased/8952-Lyndon-Li diff --git a/changelogs/unreleased/8952-Lyndon-Li b/changelogs/unreleased/8952-Lyndon-Li new file mode 100644 index 000000000..c360537b9 --- /dev/null +++ b/changelogs/unreleased/8952-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8534, refactor dm controllers to tolerate cancel request in more cases, e.g., node restart, node drain \ No newline at end of file diff --git a/pkg/builder/data_download_builder.go b/pkg/builder/data_download_builder.go index 5b23a9dcc..20cea868f 100644 --- a/pkg/builder/data_download_builder.go +++ b/pkg/builder/data_download_builder.go @@ -165,3 +165,15 @@ func (d *DataDownloadBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) d.object.Status.AcceptedTimestamp = acceptedTimestamp return d } + +// Finalizers sets the DataDownload's Finalizers. +func (d *DataDownloadBuilder) Finalizers(finalizers []string) *DataDownloadBuilder { + d.object.Finalizers = finalizers + return d +} + +// Message sets the DataDownload's Message. +func (d *DataDownloadBuilder) Message(msg string) *DataDownloadBuilder { + d.object.Status.Message = msg + return d +} diff --git a/pkg/builder/data_upload_builder.go b/pkg/builder/data_upload_builder.go index b77566bf6..48b4fa1d8 100644 --- a/pkg/builder/data_upload_builder.go +++ b/pkg/builder/data_upload_builder.go @@ -168,3 +168,15 @@ func (d *DataUploadBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) *D d.object.Status.AcceptedTimestamp = acceptedTimestamp return d } + +// Finalizers sets the DataUpload's Finalizers. +func (d *DataUploadBuilder) Finalizers(finalizers []string) *DataUploadBuilder { + d.object.Finalizers = finalizers + return d +} + +// Message sets the DataUpload's Message. +func (d *DataUploadBuilder) Message(msg string) *DataUploadBuilder { + d.object.Status.Message = msg + return d +} diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index a7f8f8754..e6b3f8872 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -56,36 +56,38 @@ import ( // DataDownloadReconciler reconciles a DataDownload object type DataDownloadReconciler struct { - client client.Client - kubeClient kubernetes.Interface - mgr manager.Manager - logger logrus.FieldLogger - Clock clock.WithTickerAndDelayedExecution - restoreExposer exposer.GenericRestoreExposer - nodeName string - dataPathMgr *datapath.Manager - restorePVCConfig nodeagent.RestorePVC - podResources corev1api.ResourceRequirements - preparingTimeout time.Duration - metrics *metrics.ServerMetrics + client client.Client + kubeClient kubernetes.Interface + mgr manager.Manager + logger logrus.FieldLogger + Clock clock.WithTickerAndDelayedExecution + restoreExposer exposer.GenericRestoreExposer + nodeName string + dataPathMgr *datapath.Manager + restorePVCConfig nodeagent.RestorePVC + podResources corev1api.ResourceRequirements + preparingTimeout time.Duration + metrics *metrics.ServerMetrics + cancelledDataDownload map[string]time.Time } func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, restorePVCConfig nodeagent.RestorePVC, podResources corev1api.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ - client: client, - kubeClient: kubeClient, - mgr: mgr, - logger: logger.WithField("controller", "DataDownload"), - Clock: &clock.RealClock{}, - nodeName: nodeName, - restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), - restorePVCConfig: restorePVCConfig, - dataPathMgr: dataPathMgr, - podResources: podResources, - preparingTimeout: preparingTimeout, - metrics: metrics, + client: client, + kubeClient: kubeClient, + mgr: mgr, + logger: logger.WithField("controller", "DataDownload"), + Clock: &clock.RealClock{}, + nodeName: nodeName, + restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + restorePVCConfig: restorePVCConfig, + dataPathMgr: dataPathMgr, + podResources: podResources, + preparingTimeout: preparingTimeout, + metrics: metrics, + cancelledDataDownload: make(map[string]time.Time), } } @@ -118,42 +120,90 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } - if r.restoreExposer == nil { - return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log) - } - - // Add finalizer // Logic for clear resources when datadownload been deleted - if dd.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning - if !isDataDownloadInFinalState(dd) && !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { - succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { + if !isDataDownloadInFinalState(dd) { + if !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { + if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dd *velerov2alpha1api.DataDownload) bool { + if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { + return false + } + controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer) - }) - if err != nil { - log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) + + return true + }); err != nil { + log.WithError(err).Errorf("failed to add finalizer for dd %s/%s", dd.Namespace, dd.Name) return ctrl.Result{}, err - } else if !succeeded { - log.Warnf("failed to add finalizer for %s/%s and will requeue later", dd.Namespace, dd.Name) - return ctrl.Result{Requeue: true}, nil + } + + return ctrl.Result{}, nil + } + + if !dd.DeletionTimestamp.IsZero() { + if !dd.Spec.Cancel { + // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism + // to help clear up resources instead of clear them directly in case of some conflict with Expose action + log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase) + + if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool { + if dataDownload.Spec.Cancel { + return false + } + + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = "Cancel datadownload because it is being deleted" + + return true + }); err != nil { + log.WithError(err).Errorf("failed to set cancel flag for dd %s/%s", dd.Namespace, dd.Name) + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil } } - } else if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) { - // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism - // to help clear up resources instead of clear them directly in case of some conflict with Expose action - log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase) + } else { + delete(r.cancelledDataDownload, dd.Name) - if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool { - if dataDownload.Spec.Cancel { - return false + // put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status + // instead of intermediate state. + // remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up + // also in final status cr won't block the direct delete of the velero namespace + if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { + if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dd *velerov2alpha1api.DataDownload) bool { + if !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { + return false + } + + controllerutil.RemoveFinalizer(dd, DataUploadDownloadFinalizer) + + return true + }); err != nil { + log.WithError(err).Error("error to remove finalizer") + return ctrl.Result{}, err } - dataDownload.Spec.Cancel = true - dataDownload.Status.Message = "Cancel datadownload because it is being deleted" + return ctrl.Result{}, nil + } + } - return true - }); err != nil { - log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) - return ctrl.Result{}, err + if dd.Spec.Cancel { + if spotted, found := r.cancelledDataDownload[dd.Name]; !found { + r.cancelledDataDownload[dd.Name] = r.Clock.Now() + } else { + delay := cancelDelayOthers + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + delay = cancelDelayInProgress + } + + if time.Since(spotted) > delay { + log.Infof("Data download %s is canceled in Phase %s but not handled in rasonable time", dd.GetName(), dd.Status.Phase) + if r.tryCancelDataDownload(ctx, dd, "") { + delete(r.cancelledDataDownload, dd.Name) + } + + return ctrl.Result{}, nil + } } } @@ -167,7 +217,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request accepted, err := r.acceptDataDownload(ctx, dd) if err != nil { - return r.errorOut(ctx, dd, err, "error to accept the data download", log) + return ctrl.Result{}, errors.Wrapf(err, "error accepting the data download %s", dd.Name) } if !accepted { @@ -193,44 +243,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // And then only the controller who is in the same node could do the rest work. err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposeParam) if err != nil { - if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { - if !apierrors.IsNotFound(err) { - return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") - } - } - if isDataDownloadInFinalState(dd) { - log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err) - r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - return ctrl.Result{}, nil - } else { - return r.errorOut(ctx, dd, err, "error to expose snapshot", log) - } + return r.errorOut(ctx, dd, err, "error to expose snapshot", log) } log.Info("Restore is exposed") - // we need to get CR again for it may canceled by datadownload controller on other - // nodes when doing expose action, if detectd cancel action we need to clear up the internal - // resources created by velero during backup. - if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { - if apierrors.IsNotFound(err) { - log.Debug("Unable to find datadownload") - return ctrl.Result{}, nil - } - return ctrl.Result{}, errors.Wrap(err, "getting datadownload") - } - // we need to clean up resources as resources created in Expose it may later than cancel action or prepare time - // and need to clean up resources again - if isDataDownloadInFinalState(dd) { - r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - } - return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { - if dd.Spec.Cancel { - log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.tryCancelAcceptedDataDownload(ctx, dd, "") - } else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { - r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) + if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { + r.tryCancelDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) } else if dd.Status.AcceptedTimestamp != nil { if time.Since(dd.Status.AcceptedTimestamp.Time) >= r.preparingTimeout { @@ -240,7 +260,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { - log.Info("Data download is prepared") + log.Infof("Data download is prepared and should be processed by %s (%s)", dd.Status.Node, r.nodeName) + + if dd.Status.Node != r.nodeName { + return ctrl.Result{}, nil + } if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) @@ -259,13 +283,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request if err != nil { return r.errorOut(ctx, dd, err, "restore exposer is not ready", log) } else if result == nil { - log.Debug("Get empty restore exposer") - return ctrl.Result{}, nil + return r.errorOut(ctx, dd, errors.New("no expose result is available for the current node"), "exposed snapshot is not ready", log) } log.Info("Restore PVC is ready and creating data path routine") - // Need to first create file system BR and get data path instance then update data upload status + // Need to first create file system BR and get data path instance then update data download status callbacks := datapath.Callbacks{ OnCompleted: r.OnDataDownloadCompleted, OnFailed: r.OnDataDownloadFailed, @@ -292,16 +315,30 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } // Update status to InProgress - original := dd.DeepCopy() - dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress - dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} - if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name) + terminated := false + if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool { + if isDataDownloadInFinalState(dd) { + terminated = true + return false + } + + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress + dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + + return true + }); err != nil { + log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will data path close and retry", dd.Name) r.closeDataPath(ctx, dd.Name) return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } + if terminated { + log.Warnf("datadownload %s is terminated during transition from prepared", dd.Name) + r.closeDataPath(ctx, dd.Name) + return ctrl.Result{}, nil + } + log.Info("Data download is marked as in progress") if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil { @@ -313,45 +350,41 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - log.Info("Data download is in progress") if dd.Spec.Cancel { - log.Info("Data download is being canceled") + if dd.Status.Node != r.nodeName { + return ctrl.Result{}, nil + } + + log.Info("In progress data download is being canceled") + asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name) if asyncBR == nil { - if r.nodeName == dd.Status.Node { - r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) - } else { - log.Info("Data path is not started in this node and will not canceled by current node") - } + r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) return ctrl.Result{}, nil } // Update status to Canceling. - original := dd.DeepCopy() - dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling - if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating data download status") + if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool { + if isDataDownloadInFinalState(dd) { + log.Warnf("datadownload %s is terminated, abort setting it to canceling", dd.Name) + return false + } + + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling + return true + }); err != nil { + log.WithError(err).Error("error updating data download into canceling status") return ctrl.Result{}, err } + asyncBR.Cancel() return ctrl.Result{}, nil } - return ctrl.Result{}, nil - } else { - // put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status - // instead of intermediate state - // remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up - // also in final status cr won't block the direct delete of the velero namespace - if isDataDownloadInFinalState(dd) && controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { - original := dd.DeepCopy() - controllerutil.RemoveFinalizer(dd, DataUploadDownloadFinalizer) - if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error to remove finalizer") - } - } return ctrl.Result{}, nil } + + return ctrl.Result{}, nil } func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { @@ -451,15 +484,14 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na log.WithError(err).Error("error updating data download status") } else { r.metrics.RegisterDataDownloadCancel(r.nodeName) + delete(r.cancelledDataDownload, dd.Name) } } } -func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { +func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) bool { log := r.logger.WithField("datadownload", dd.Name) - log.Warn("Accepted data download is canceled") - - succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) { + succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled if dataDownload.Status.StartTimestamp.IsZero() { dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} @@ -473,31 +505,29 @@ func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Conte if err != nil { log.WithError(err).Error("error updating datadownload status") - return + return false } else if !succeeded { log.Warn("conflict in updating datadownload status and will try it again later") - return + return false } // success update r.metrics.RegisterDataDownloadCancel(r.nodeName) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + + log.Warn("data download is canceled") + + return true } func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { log := r.logger.WithField("datadownload", ddName) - var dd velerov2alpha1api.DataDownload - if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil { - log.WithError(err).Warn("Failed to get data download on progress") - return - } - - original := dd.DeepCopy() - dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} - - if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Failed to update restore snapshot progress") + if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: ddName}, log, func(dd *velerov2alpha1api.DataDownload) bool { + dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + return true + }); err != nil { + log.WithError(err).Error("Failed to update progress") } } @@ -509,7 +539,19 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { gp := kube.NewGenericEventPredicate(func(object client.Object) bool { dd := object.(*velerov2alpha1api.DataDownload) - return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted) + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { + return true + } + + if dd.Spec.Cancel && !isDataDownloadInFinalState(dd) { + return true + } + + if isDataDownloadInFinalState(dd) && !dd.DeletionTimestamp.IsZero() { + return true + } + + return false }) s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataDownload), r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{ Predicates: []predicate.Predicate{gp}, @@ -568,10 +610,17 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context, if pod.Status.Phase == corev1api.PodRunning { log.Info("Preparing data download") - // we don't expect anyone else update the CR during the Prepare process - updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload) - if err != nil || !updated { - log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload") + if err = UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, + func(dd *velerov2alpha1api.DataDownload) bool { + if isDataDownloadInFinalState(dd) { + log.Warnf("datadownload %s is terminated, abort setting it to prepared", dd.Name) + return false + } + + r.prepareDataDownload(dd) + return true + }); err != nil { + log.WithError(err).Warn("failed to update dataudownload, prepare will halt for this dataudownload") return []reconcile.Request{} } } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { @@ -618,13 +667,19 @@ func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha } func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error { - log.Infof("update data download status to %v", dd.Status.Phase) - original := dd.DeepCopy() - dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed - dd.Status.Message = errors.WithMessage(err, msg).Error() - dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + log.Info("update data download status to Failed") - if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil { + if patchErr := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool { + if isDataDownloadInFinalState(dd) { + return false + } + + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed + dd.Status.Message = errors.WithMessage(err, msg).Error() + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + + return true + }); patchErr != nil { log.WithError(patchErr).Error("error updating DataDownload status") } else { r.metrics.RegisterDataDownloadFailure(r.nodeName) @@ -647,7 +702,7 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()} } - succeeded, err := r.exclusiveUpdateDataDownload(ctx, updated, updateFunc) + succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, updated, updateFunc) if err != nil { return false, err @@ -667,7 +722,7 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler log := r.logger.WithField("DataDownload", dd.Name) log.Info("Timeout happened for preparing datadownload") - succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { + succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dd *velerov2alpha1api.DataDownload) { dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed dd.Status.Message = "timeout on preparing data download" }) @@ -678,7 +733,7 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler } if !succeeded { - log.Warn("Dataupload has been updated by others") + log.Warn("Datadownload has been updated by others") return } @@ -689,16 +744,18 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - log.Info("Dataupload has been cleaned up") + log.Info("Datadownload has been cleaned up") r.metrics.RegisterDataDownloadFailure(r.nodeName) } -func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, +var funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload + +func exclusiveUpdateDataDownload(ctx context.Context, cli client.Client, dd *velerov2alpha1api.DataDownload, updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) { updateFunc(dd) - err := r.client.Update(ctx, dd) + err := cli.Update(ctx, dd) if err == nil { return true, nil @@ -805,7 +862,7 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool { dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted } -func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataDownload) bool) error { +func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov2alpha1api.DataDownload) bool) error { return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { dd := &velerov2alpha1api.DataDownload{} if err := client.Get(ctx, namespacedName, dd); err != nil { @@ -839,11 +896,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, for i := range dataDownloads.Items { dd := &dataDownloads.Items[i] - if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { - // keep doing nothing let controller re-download the data - // the Prepared CR could be still handled by datadownload controller after node-agent restart - logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared") - } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { if dd.Status.Node != r.nodeName { logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node) continue @@ -870,25 +923,12 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, return true }) if err != nil { - logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") - } - } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { - r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase") - - err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, - r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { - if dataDownload.Spec.Cancel { - return false - } - - dataDownload.Spec.Cancel = true - dataDownload.Status.Message = "Datadownload is in Accepted status during the node-agent starting, mark it as cancel" - - return true - }) - if err != nil { - r.logger.WithField("datadownload", dd.GetName()).WithError(err).Errorf("Failed to trigger dataupload cancel") + logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger datadownload cancel") } + } else { + // the Prepared CR could be still handled by datadownload controller after node-agent restart + // the accepted CR may also suvived from node-agent restart as long as the intermediate objects are all done + logger.WithField("datadownload", dd.GetName()).Infof("find a datadownload with status %s", dd.Status.Phase) } } @@ -914,7 +954,7 @@ func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context, OnProgress: r.OnDataDownloadProgress, } - asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log) + asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log) if err != nil { return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name) } diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index b81ba7573..fa2c62130 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -37,7 +37,6 @@ import ( clientgofake "k8s.io/client-go/kubernetes/fake" ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -68,7 +67,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder { PV: "test-pv", PVC: "test-pvc", Namespace: "test-ns", - }).NodeOS(velerov2alpha1api.NodeOS("linux")) + }) } func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { @@ -171,89 +170,204 @@ func TestDataDownloadReconcile(t *testing.T) { node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result() tests := []struct { - name string - dd *velerov2alpha1api.DataDownload - targetPVC *corev1api.PersistentVolumeClaim - dataMgr *datapath.Manager - needErrs []bool - needCreateFSBR bool - isExposeErr bool - isGetExposeErr bool - isPeekExposeErr bool - isNilExposer bool - isFSBRInitErr bool - isFSBRRestoreErr bool - notNilExpose bool - notMockCleanUp bool - mockInit bool - mockInitErr error - mockStart bool - mockStartErr error - mockCancel bool - mockClose bool - expected *velerov2alpha1api.DataDownload - expectedStatusMsg string - checkFunc func(du velerov2alpha1api.DataDownload) bool - expectedResult *ctrl.Result + name string + dd *velerov2alpha1api.DataDownload + notCreateDD bool + targetPVC *corev1api.PersistentVolumeClaim + dataMgr *datapath.Manager + needErrs []bool + needCreateFSBR bool + needDelete bool + sportTime *metav1.Time + isExposeErr bool + isGetExposeErr bool + isGetExposeNil bool + isPeekExposeErr bool + isNilExposer bool + notNilExpose bool + notMockCleanUp bool + mockInit bool + mockInitErr error + mockStart bool + mockStartErr error + mockCancel bool + mockClose bool + needExclusiveUpdateError error + expected *velerov2alpha1api.DataDownload + expectDeleted bool + expectCancelRecord bool + expectedResult *ctrl.Result + expectedErr string + expectDataPath bool }{ { - name: "Unknown data download status", - dd: dataDownloadBuilder().Phase("Unknown").Cancel(true).Result(), + name: "dd not found", + dd: dataDownloadBuilder().Result(), + notCreateDD: true, + }, + { + name: "dd not created in velero default namespace", + dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(), + }, + { + name: "get dd fail", + dd: dataDownloadBuilder().Result(), + needErrs: []bool{true, false, false, false}, + expectedErr: "Get error", + }, + { + name: "dd is not for built-in dm", + dd: dataDownloadBuilder().DataMover("other").Result(), + }, + { + name: "add finalizer to dd", + dd: dataDownloadBuilder().Result(), + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + }, + { + name: "add finalizer to dd failed", + dd: dataDownloadBuilder().Result(), + needErrs: []bool{false, false, true, false}, + expectedErr: "error updating datadownload velero/datadownload-1: Update error", + }, + { + name: "dd is under deletion", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + needDelete: true, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), + }, + { + name: "dd is under deletion but cancel failed", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + needErrs: []bool{false, false, true, false}, + needDelete: true, + expectedErr: "error updating datadownload velero/datadownload-1: Update error", + }, + { + name: "dd is under deletion and in terminal state", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), + sportTime: &metav1.Time{Time: time.Now()}, + needDelete: true, + expectDeleted: true, + }, + { + name: "dd is under deletion and in terminal state, but remove finalizer failed", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), + needErrs: []bool{false, false, true, false}, + needDelete: true, + expectedErr: "error updating datadownload velero/datadownload-1: Update error", + }, + { + name: "delay cancel negative for others", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + sportTime: &metav1.Time{Time: time.Now()}, + expectCancelRecord: true, + }, + { + name: "delay cancel negative for inProgress", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)}, + expectCancelRecord: true, + }, + { + name: "delay cancel affirmative for others", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)}, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), + }, + { + name: "delay cancel affirmative for inProgress", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), + }, + { + name: "delay cancel failed", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + needErrs: []bool{false, false, true, false}, + sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + expectCancelRecord: true, + }, + { + name: "Unknown data download status", + dd: dataDownloadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + }, + { + name: "new dd but no target PVC", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedResult: &ctrl.Result{Requeue: true}, + }, + { + name: "new dd but accept failed", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needExclusiveUpdateError: errors.New("exclusive-update-error"), + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedErr: "error accepting the data download datadownload-1: exclusive-update-error", + }, + { + name: "dd is cancel on accepted", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), }, { - name: "Cancel data downloand in progress and patch data download error", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, false, true}, - needCreateFSBR: true, - expectedStatusMsg: "Patch error", + name: "dd is accepted but setup expose param failed", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Phase(velerov2alpha1api.DataDownloadPhaseFailed).Message("failed to set exposer parameters").Result(), + expectedErr: "no appropriate node to run datadownload velero/datadownload-1: node with OS xxx doesn't exist", }, { - name: "Cancel data downloand in progress with empty FSBR", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - mockCancel: true, + name: "dd expose failed", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + isExposeErr: true, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Message("error to expose snapshot").Result(), + expectedErr: "Error to expose restore exposer", }, { - name: "Cancel data downloand in progress with create FSBR", - dd: func() *velerov2alpha1api.DataDownload { - dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() - dd.Status.Node = "test-node" - return dd - }(), + name: "dd succeeds for accepted", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + }, + { + name: "prepare timeout on accepted", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Finalizers([]string{DataUploadDownloadFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(), + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Message("timeout on preparing data download").Result(), + }, + { + name: "peek error on accepted", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + isPeekExposeErr: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Message("found a datadownload velero/datadownload-1 with expose error: fake-peek-error. mark it as cancel").Result(), + }, + { + name: "cancel on prepared", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Cancel(true).Result(), + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), + }, + { + name: "Failed to get restore expose on prepared", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needCreateFSBR: true, - mockCancel: true, - expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Result(), + isGetExposeErr: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("restore exposer is not ready").Result(), + expectedErr: "Error to get restore exposer", }, { - name: "Cancel data downloand in progress without create FSBR", - dd: func() *velerov2alpha1api.DataDownload { - dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() - dd.Status.Node = "test-node" - return dd - }(), + name: "Get nil restore expose on prepared", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needCreateFSBR: false, - mockCancel: true, - expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), - }, - { - name: "Cancel data downloand in progress in different node", - dd: func() *velerov2alpha1api.DataDownload { - dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() - dd.Status.Node = "different-node" - return dd - }(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needCreateFSBR: false, - mockCancel: true, - expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + isGetExposeNil: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("exposed snapshot is not ready").Result(), + expectedErr: "no expose result is available for the current node", }, { name: "Error in data path is concurrent limited", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), dataMgr: datapath.NewManager(0), notNilExpose: true, @@ -261,158 +375,104 @@ func TestDataDownloadReconcile(t *testing.T) { expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "data path init error", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - mockInit: true, - mockInitErr: errors.New("fake-data-path-init-error"), - mockClose: true, - notNilExpose: true, - expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error", + name: "data path init error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockInitErr: errors.New("fake-data-path-init-error"), + mockClose: true, + notNilExpose: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error initializing data path").Result(), + expectedErr: "error initializing asyncBR: fake-data-path-init-error", }, { name: "Unable to update status to in progress for data download", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, false, true}, + needErrs: []bool{false, false, true, false}, mockInit: true, mockClose: true, notNilExpose: true, notMockCleanUp: true, - expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "data path start error", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - mockInit: true, - mockStart: true, - mockStartErr: errors.New("fake-data-path-start-error"), - mockClose: true, - notNilExpose: true, - expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", + name: "data path start error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockStart: true, + mockStartErr: errors.New("fake-data-path-start-error"), + mockClose: true, + notNilExpose: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error starting data path").Result(), + expectedErr: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", }, { - name: "accept DataDownload error", - dd: dataDownloadBuilder().Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, true, false}, - expectedStatusMsg: "Update error", + name: "Prepare succeeds", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockStart: true, + notNilExpose: true, + notMockCleanUp: true, + expectDataPath: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "Not create target pvc", - dd: dataDownloadBuilder().Result(), + name: "In progress dd is not handled by the current node", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "Uninitialized dataDownload", - dd: dataDownloadBuilder().Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - isNilExposer: true, - expectedStatusMsg: "uninitialized generic exposer", + name: "In progress dd is not set as cancel", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "DataDownload not created in velero default namespace", - dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + name: "Cancel data downloand in progress with empty FSBR", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "Failed to get dataDownload", - dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{true, false, false, false}, - expectedStatusMsg: "Get error", + name: "Cancel data downloand in progress and patch data download error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + needErrs: []bool{false, false, true, false}, + needCreateFSBR: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedErr: "error updating datadownload velero/datadownload-1: Update error", + expectCancelRecord: true, + expectDataPath: true, }, { - name: "Unsupported dataDownload type", - dd: dataDownloadBuilder().DataMover("Unsuppoorted type").Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - }, - { - name: "Restore is exposed", - dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSLinux).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - }, - { - name: "Expected node doesn't exist", - dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSWindows).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - expectedStatusMsg: "no appropriate node to run datadownload", - }, - { - name: "Get empty restore exposer", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - }, - { - name: "Failed to get restore exposer", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - expectedStatusMsg: "Error to get restore exposer", - isGetExposeErr: true, - }, - { - name: "Error to start restore expose", - dd: dataDownloadBuilder().Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - expectedStatusMsg: "Error to expose restore exposer", - isExposeErr: true, - }, - { - name: "prepare timeout", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), - expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), - }, - { - name: "peek error", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), - isPeekExposeErr: true, - expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), - }, - { - name: "dataDownload with enabled cancel", - dd: func() *velerov2alpha1api.DataDownload { - dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result() - controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer) - dd.DeletionTimestamp = &metav1.Time{Time: time.Now()} - return dd - }(), - checkFunc: func(du velerov2alpha1api.DataDownload) bool { - return du.Spec.Cancel - }, - expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), - }, - { - name: "dataDownload with remove finalizer and should not be retrieved", - dd: func() *velerov2alpha1api.DataDownload { - dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Cancel(true).Result() - controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer) - dd.DeletionTimestamp = &metav1.Time{Time: time.Now()} - return dd - }(), - checkFunc: func(dd velerov2alpha1api.DataDownload) bool { - return !controllerutil.ContainsFinalizer(&dd, DataUploadDownloadFinalizer) - }, + name: "Cancel data downloand in progress succeeds", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + needCreateFSBR: true, + mockCancel: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectDataPath: true, + expectCancelRecord: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { objs := []runtime.Object{daemonSet, node} + if test.targetPVC != nil { objs = append(objs, test.targetPVC) } + r, err := initDataDownloadReconciler(objs, test.needErrs...) require.NoError(t, err) - defer func() { - r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{}) - if test.targetPVC != nil { - r.client.Delete(ctx, test.targetPVC, &kbclient.DeleteOptions{}) - } - }() - ctx := context.Background() - if test.dd.Namespace == velerov1api.DefaultNamespace { - err = r.client.Create(ctx, test.dd) + if !test.notCreateDD { + err = r.client.Create(context.Background(), test.dd) + require.NoError(t, err) + } + + if test.needDelete { + err = r.client.Delete(context.Background(), test.dd) require.NoError(t, err) } @@ -422,6 +482,17 @@ func TestDataDownloadReconcile(t *testing.T) { r.dataPathMgr = datapath.NewManager(1) } + if test.sportTime != nil { + r.cancelledDataDownload[test.dd.Name] = test.sportTime.Time + } + + funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload + if test.needExclusiveUpdateError != nil { + funcExclusiveUpdateDataDownload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataDownload, func(*velerov2alpha1api.DataDownload)) (bool, error) { + return false, test.needExclusiveUpdateError + } + } + datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { asyncBR := datapathmockes.NewAsyncBR(t) @@ -444,7 +515,7 @@ func TestDataDownloadReconcile(t *testing.T) { return asyncBR } - if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose { + if test.isExposeErr || test.isGetExposeErr || test.isGetExposeNil || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose { if test.isNilExposer { r.restoreExposer = nil } else { @@ -458,6 +529,8 @@ func TestDataDownloadReconcile(t *testing.T) { ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil) } else if test.isGetExposeErr { ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer")) + } else if test.isGetExposeNil { + ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) } else if test.isPeekExposeErr { ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error")) } @@ -485,48 +558,47 @@ func TestDataDownloadReconcile(t *testing.T) { }, }) - if test.expectedStatusMsg != "" { - require.ErrorContains(t, err, test.expectedStatusMsg) + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) } else { - require.NoError(t, err) + assert.NoError(t, err) } - require.NotNil(t, actualResult) - if test.expectedResult != nil { assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue) assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) } - dd := velerov2alpha1api.DataDownload{} - err = r.client.Get(ctx, kbclient.ObjectKey{ - Name: test.dd.Name, - Namespace: test.dd.Namespace, - }, &dd) + if test.expected != nil || test.expectDeleted { + dd := velerov2alpha1api.DataDownload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.dd.Name, + Namespace: test.dd.Namespace, + }, &dd) - if test.expected != nil { - require.NoError(t, err) - assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase) - } - - if test.isGetExposeErr { - assert.Contains(t, dd.Status.Message, test.expectedStatusMsg) - } - if test.dd.Namespace == velerov1api.DefaultNamespace { - if controllerutil.ContainsFinalizer(test.dd, DataUploadDownloadFinalizer) { - assert.True(t, true, apierrors.IsNotFound(err)) //nolint:testifylint //FIXME + if test.expectDeleted { + assert.True(t, apierrors.IsNotFound(err)) } else { require.NoError(t, err) + + assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase) + assert.Contains(t, dd.Status.Message, test.expected.Status.Message) + assert.Equal(t, dd.Finalizers, test.expected.Finalizers) + assert.Equal(t, dd.Spec.Cancel, test.expected.Spec.Cancel) } - } else { - assert.True(t, true, apierrors.IsNotFound(err)) //nolint:testifylint //FIXME } - if !test.needCreateFSBR { + if !test.expectDataPath { assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) + } else { + assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) } - t.Logf("%s: \n %v \n", test.name, dd) + if test.expectCancelRecord { + assert.Contains(t, r.cancelledDataDownload, test.dd.Name) + } else { + assert.Empty(t, r.cancelledDataDownload) + } }) } } @@ -661,7 +733,7 @@ func TestOnDataDownloadProgress(t *testing.T) { { name: "failed to patch datadownload", dd: dataDownloadBuilder().Result(), - needErrs: []bool{false, false, false, true}, + needErrs: []bool{false, false, true, false}, }, } for _, test := range tests { @@ -883,7 +955,7 @@ func TestTryCancelDataDownload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.tryCancelAcceptedDataDownload(ctx, test.dd, "") + r.tryCancelDataDownload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) @@ -1010,33 +1082,12 @@ func TestAttemptDataDownloadResume(t *testing.T) { expectedError string }{ { - name: "accepted DataDownload with no dd label", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), - cancelledDataDownloads: []string{dataDownloadName}, - acceptedDataDownloads: []string{dataDownloadName}, + name: "Other DataDownload", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), }, { - name: "accepted DataDownload in the current node", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).AcceptedByNode("node-1").Result(), - cancelledDataDownloads: []string{dataDownloadName}, - acceptedDataDownloads: []string{dataDownloadName}, - }, - { - name: "accepted DataDownload with dd label but is canceled", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Cancel(true).AcceptedByNode("node-1").Result(), - acceptedDataDownloads: []string{dataDownloadName}, - cancelledDataDownloads: []string{dataDownloadName}, - }, - { - name: "accepted DataDownload with dd label but cancel fail", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).AcceptedByNode("node-1").Result(), - needErrs: []bool{false, false, true, false, false, false}, - acceptedDataDownloads: []string{dataDownloadName}, - }, - { - name: "prepared DataDownload", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - prepareddDataDownloads: []string{dataDownloadName}, + name: "Other DataDownload", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), }, { name: "InProgress DataDownload, not the current node", diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index faa40ad9a..f9aa2fad4 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -59,6 +59,8 @@ const ( dataUploadDownloadRequestor = "snapshot-data-upload-download" DataUploadDownloadFinalizer = "velero.io/data-upload-download-finalizer" preparingMonitorFrequency = time.Minute + cancelDelayInProgress = time.Hour + cancelDelayOthers = time.Minute * 5 ) // DataUploadReconciler reconciles a DataUpload object @@ -77,6 +79,7 @@ type DataUploadReconciler struct { podResources corev1api.ResourceRequirements preparingTimeout time.Duration metrics *metrics.ServerMetrics + cancelledDataUpload map[string]time.Time } func NewDataUploadReconciler( @@ -109,12 +112,13 @@ func NewDataUploadReconciler( log, ), }, - dataPathMgr: dataPathMgr, - loadAffinity: loadAffinity, - backupPVCConfig: backupPVCConfig, - podResources: podResources, - preparingTimeout: preparingTimeout, - metrics: metrics, + dataPathMgr: dataPathMgr, + loadAffinity: loadAffinity, + backupPVCConfig: backupPVCConfig, + podResources: podResources, + preparingTimeout: preparingTimeout, + metrics: metrics, + cancelledDataUpload: make(map[string]time.Time), } } @@ -144,52 +148,104 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } + // Logic for clear resources when dataupload been deleted + if !isDataUploadInFinalState(du) { + if !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) { + if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool { + if controllerutil.ContainsFinalizer(dataUpload, DataUploadDownloadFinalizer) { + return false + } + + controllerutil.AddFinalizer(dataUpload, DataUploadDownloadFinalizer) + + return true + }); err != nil { + log.WithError(err).Errorf("failed to add finalizer for du %s/%s", du.Namespace, du.Name) + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil + } + + if !du.DeletionTimestamp.IsZero() { + if !du.Spec.Cancel { + // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism + // to help clear up resources instead of clear them directly in case of some conflict with Expose action + log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase) + + if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool { + if dataUpload.Spec.Cancel { + return false + } + + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = "Cancel dataupload because it is being deleted" + + return true + }); err != nil { + log.WithError(err).Errorf("failed to set cancel flag for du %s/%s", du.Namespace, du.Name) + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil + } + } + } else { + delete(r.cancelledDataUpload, du.Name) + + // put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status + // instead of intermediate state. + // remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up + // also in final status cr won't block the direct delete of the velero namespace + if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) { + if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(du *velerov2alpha1api.DataUpload) bool { + if !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) { + return false + } + + controllerutil.RemoveFinalizer(du, DataUploadDownloadFinalizer) + + return true + }); err != nil { + log.WithError(err).Error("error to remove finalizer") + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil + } + } + + if du.Spec.Cancel { + if spotted, found := r.cancelledDataUpload[du.Name]; !found { + r.cancelledDataUpload[du.Name] = r.Clock.Now() + } else { + delay := cancelDelayOthers + if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + delay = cancelDelayInProgress + } + + if time.Since(spotted) > delay { + log.Infof("Data upload %s is canceled in Phase %s but not handled in reasonable time", du.GetName(), du.Status.Phase) + if r.tryCancelDataUpload(ctx, du, "") { + delete(r.cancelledDataUpload, du.Name) + } + + return ctrl.Result{}, nil + } + } + } + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] if !ok { return r.errorOut(ctx, du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log) } - // Logic for clear resources when dataupload been deleted - if du.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning - if !isDataUploadInFinalState(du) && !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) { - succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { - controllerutil.AddFinalizer(du, DataUploadDownloadFinalizer) - }) - - if err != nil { - log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), du.Namespace, du.Name) - return ctrl.Result{}, err - } else if !succeeded { - log.Warnf("failed to add finalizer for %s/%s and will requeue later", du.Namespace, du.Name) - return ctrl.Result{Requeue: true}, nil - } - } - } else if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) && !du.Spec.Cancel && !isDataUploadInFinalState(du) { - // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism - // to help clear up resources instead of clear them directly in case of some conflict with Expose action - log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase) - - if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool { - if dataUpload.Spec.Cancel { - return false - } - - dataUpload.Spec.Cancel = true - dataUpload.Status.Message = "Cancel dataupload because it is being deleted" - - return true - }); err != nil { - log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), du.Namespace, du.Name) - return ctrl.Result{}, err - } - } - if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew { log.Info("Data upload starting") accepted, err := r.acceptDataUpload(ctx, du) if err != nil { - return r.errorOut(ctx, du, err, "error to accept the data upload", log) + return ctrl.Result{}, errors.Wrapf(err, "error accepting the data upload %s", du.Name) } if !accepted { @@ -213,48 +269,15 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. if err := ep.Expose(ctx, getOwnerObject(du), exposeParam); err != nil { - if err := r.client.Get(ctx, req.NamespacedName, du); err != nil { - if !apierrors.IsNotFound(err) { - return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") - } - } - if isDataUploadInFinalState(du) { - log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err) - r.cleanUp(ctx, du, log) - return ctrl.Result{}, nil - } else { - return r.errorOut(ctx, du, err, "error to expose snapshot", log) - } + return r.errorOut(ctx, du, err, "error exposing snapshot", log) } log.Info("Snapshot is exposed") - // we need to get CR again for it may canceled by dataupload controller on other - // nodes when doing expose action, if detectd cancel action we need to clear up the internal - // resources created by velero during backup. - if err := r.client.Get(ctx, req.NamespacedName, du); err != nil { - if apierrors.IsNotFound(err) { - log.Debug("Unable to find DataUpload") - return ctrl.Result{}, nil - } - return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") - } - - // we need to clean up resources as resources created in Expose it may later than cancel action or prepare time - // and need to clean up resources again - if isDataUploadInFinalState(du) { - r.cleanUp(ctx, du, log) - } - return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { - if du.Spec.Cancel { - // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action - // we could retry when the CR requeue in periodcally - log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) - r.tryCancelAcceptedDataUpload(ctx, du, "") - } else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { - r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) + if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { + r.tryCancelDataUpload(ctx, du, fmt.Sprintf("found a du %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) } else if du.Status.AcceptedTimestamp != nil { if time.Since(du.Status.AcceptedTimestamp.Time) >= r.preparingTimeout { @@ -264,9 +287,14 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { - log.Info("Data upload is prepared") + log.Infof("Data upload is prepared and should be processed by %s (%s)", du.Status.Node, r.nodeName) + + if du.Status.Node != r.nodeName { + return ctrl.Result{}, nil + } if du.Spec.Cancel { + log.Info("Prepared data upload is being canceled") r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) return ctrl.Result{}, nil } @@ -281,8 +309,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { return r.errorOut(ctx, du, err, "exposed snapshot is not ready", log) } else if res == nil { - log.Debug("Get empty exposer") - return ctrl.Result{}, nil + return r.errorOut(ctx, du, errors.New("no expose result is available for the current node"), "exposed snapshot is not ready", log) } if res.ByPod.NodeOS == nil { @@ -318,17 +345,31 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Update status to InProgress - original := du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress - du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} - du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS) - if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { + terminated := false + if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool { + if isDataUploadInFinalState(du) { + terminated = true + return false + } + + du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS) + + return true + }); err != nil { log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) r.closeDataPath(ctx, du.Name) return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } + if terminated { + log.Warnf("dataupload %s is terminated during transition from prepared", du.Name) + r.closeDataPath(ctx, du.Name) + return ctrl.Result{}, nil + } + log.Info("Data upload is marked as in progress") if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil { @@ -340,45 +381,39 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - log.Info("Data upload is in progress") if du.Spec.Cancel { - log.Info("Data upload is being canceled") + if du.Status.Node != r.nodeName { + return ctrl.Result{}, nil + } + + log.Info("In progress data upload is being canceled") asyncBR := r.dataPathMgr.GetAsyncBR(du.Name) if asyncBR == nil { - if du.Status.Node == r.nodeName { - r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) - } else { - log.Info("Data path is not started in this node and will not canceled by current node") - } + r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) return ctrl.Result{}, nil } // Update status to Canceling - original := du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling - if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { + if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool { + if isDataUploadInFinalState(du) { + log.Warnf("dataupload %s is terminated, abort setting it to canceling", du.Name) + return false + } + + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling + return true + }); err != nil { log.WithError(err).Error("error updating data upload into canceling status") return ctrl.Result{}, err } + asyncBR.Cancel() return ctrl.Result{}, nil } - return ctrl.Result{}, nil - } else { - // put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status - // instead of intermediate state. - // remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up - // also in final status cr won't block the direct delete of the velero namespace - if isDataUploadInFinalState(du) && controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) { - original := du.DeepCopy() - controllerutil.RemoveFinalizer(du, DataUploadDownloadFinalizer) - if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error to remove finalizer") - } - } - return ctrl.Result{}, nil } + + return ctrl.Result{}, nil } func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { @@ -488,14 +523,14 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp log.WithError(err).Error("error updating DataUpload status") } else { r.metrics.RegisterDataUploadCancel(r.nodeName) + delete(r.cancelledDataUpload, du.Name) } } } -func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { +func (r *DataUploadReconciler) tryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) bool { log := r.logger.WithField("dataupload", du.Name) - log.Warn("Accepted data upload is canceled") - succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { + succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(dataUpload *velerov2alpha1api.DataUpload) { dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if dataUpload.Status.StartTimestamp.IsZero() { dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} @@ -509,16 +544,20 @@ func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, if err != nil { log.WithError(err).Error("error updating dataupload status") - return + return false } else if !succeeded { log.Warn("conflict in updating dataupload status and will try it again later") - return + return false } // success update r.metrics.RegisterDataUploadCancel(r.nodeName) // cleans up any objects generated during the snapshot expose r.cleanUp(ctx, du, log) + + log.Warn("data upload is canceled") + + return true } func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) { @@ -538,16 +577,10 @@ func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1ap func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) { log := r.logger.WithField("dataupload", duName) - var du velerov2alpha1api.DataUpload - if err := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); err != nil { - log.WithError(err).Warn("Failed to get dataupload on progress") - return - } - - original := du.DeepCopy() - du.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} - - if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: duName}, log, func(du *velerov2alpha1api.DataUpload) bool { + du.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + return true + }); err != nil { log.WithError(err).Error("Failed to update progress") } } @@ -560,7 +593,19 @@ func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespa func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { gp := kube.NewGenericEventPredicate(func(object client.Object) bool { du := object.(*velerov2alpha1api.DataUpload) - return (du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted) + if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { + return true + } + + if du.Spec.Cancel && !isDataUploadInFinalState(du) { + return true + } + + if isDataUploadInFinalState(du) && !du.DeletionTimestamp.IsZero() { + return true + } + + return false }) s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataUpload), r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{ Predicates: []predicate.Predicate{gp}, @@ -621,10 +666,17 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj if pod.Status.Phase == corev1api.PodRunning { log.Info("Preparing dataupload") - // we don't expect anyone else update the CR during the Prepare process - updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload) - if err != nil || !updated { - log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload") + if err = UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, + func(du *velerov2alpha1api.DataUpload) bool { + if isDataUploadInFinalState(du) { + log.Warnf("dataupload %s is terminated, abort setting it to prepared", du.Name) + return false + } + + r.prepareDataUpload(du) + return true + }); err != nil { + log.WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload") return []reconcile.Request{} } } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early @@ -678,18 +730,26 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) error { - original := du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed - du.Status.Message = errors.WithMessage(err, msg).Error() - if du.Status.StartTimestamp.IsZero() { - du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} - } + log.Info("update data upload status to Failed") - if dataPathError, ok := err.(datapath.DataPathError); ok { - du.Status.SnapshotID = dataPathError.GetSnapshotID() - } - du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil { + if patchErr := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool { + if isDataUploadInFinalState(du) { + return false + } + + du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed + du.Status.Message = errors.WithMessage(err, msg).Error() + if du.Status.StartTimestamp.IsZero() { + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + } + + if dataPathError, ok := err.(datapath.DataPathError); ok { + du.Status.SnapshotID = dataPathError.GetSnapshotID() + } + du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + + return true + }); patchErr != nil { log.WithError(patchErr).Error("error updating DataUpload status") } else { r.metrics.RegisterDataUploadFailure(r.nodeName) @@ -711,7 +771,7 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov dataUpload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()} } - succeeded, err := r.exclusiveUpdateDataUpload(ctx, updated, updateFunc) + succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, updated, updateFunc) if err != nil { return false, err @@ -732,7 +792,7 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov log.Info("Timeout happened for preparing dataupload") - succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { + succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(du *velerov2alpha1api.DataUpload) { du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed du.Status.Message = "timeout on preparing data upload" }) @@ -770,11 +830,13 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov r.metrics.RegisterDataUploadFailure(r.nodeName) } -func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, +var funcExclusiveUpdateDataUpload = exclusiveUpdateDataUpload + +func exclusiveUpdateDataUpload(ctx context.Context, cli client.Client, du *velerov2alpha1api.DataUpload, updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) { updateFunc(du) - err := r.client.Update(ctx, du) + err := cli.Update(ctx, du) if err == nil { return true, nil } @@ -905,7 +967,7 @@ func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool { du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted } -func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataUpload) bool) error { +func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov2alpha1api.DataUpload) bool) error { return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { du := &velerov2alpha1api.DataUpload{} if err := client.Get(ctx, namespacedName, du); err != nil { @@ -939,11 +1001,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logg for i := range dataUploads.Items { du := &dataUploads.Items[i] - if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { - // keep doing nothing let controller re-download the data - // the Prepared CR could be still handled by dataupload controller after node-agent restart - logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared") - } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { if du.Status.Node != r.nodeName { logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node) continue @@ -972,23 +1030,10 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logg if err != nil { logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") } - } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { - r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") - - err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) bool { - if dataUpload.Spec.Cancel { - return false - } - - dataUpload.Spec.Cancel = true - dataUpload.Status.Message = "Dataupload is in Accepted status during the node-agent starting, mark it as cancel" - - return true - }) - if err != nil { - r.logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") - } + } else { + // the Prepared CR could be still handled by dataupload controller after node-agent restart + // the accepted CR may also suvived from node-agent restart as long as the intermediate objects are all done + logger.WithField("dataupload", du.GetName()).Infof("find a dataupload with status %s", du.Status.Phase) } } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 6d186d2d0..11c0e8da4 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -45,7 +45,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -270,35 +269,30 @@ type fakeSnapshotExposer struct { clock clock.WithTickerAndDelayedExecution ambiguousNodeOS bool peekErr error + exposeErr error + getErr error + getNil bool } func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param any) error { - du := velerov2alpha1api.DataUpload{} - err := f.kubeClient.Get(ctx, kbclient.ObjectKey{ - Name: dataUploadName, - Namespace: velerov1api.DefaultNamespace, - }, &du) - if err != nil { - return err + if f.exposeErr != nil { + return f.exposeErr } - original := du - du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared - du.Status.StartTimestamp = &metav1.Time{Time: f.clock.Now()} - f.kubeClient.Patch(ctx, &du, kbclient.MergeFrom(&original)) return nil } func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1api.ObjectReference, tm time.Duration, para any) (*exposer.ExposeResult, error) { - pod := &corev1api.Pod{} - err := f.kubeClient.Get(ctx, kbclient.ObjectKey{ - Name: dataUploadName, - Namespace: velerov1api.DefaultNamespace, - }, pod) - if err != nil { - return nil, err + if f.getErr != nil { + return nil, f.getErr } + if f.getNil { + return nil, nil + } + + pod := &corev1api.Pod{} + nodeOS := "linux" pNodeOS := &nodeOS if f.ambiguousNodeOS { @@ -346,214 +340,265 @@ func (b *fakeDataUploadFSBR) Close(ctx context.Context) { func TestReconcile(t *testing.T) { tests := []struct { - name string - du *velerov2alpha1api.DataUpload - pod *corev1api.Pod - pvc *corev1api.PersistentVolumeClaim - snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer - dataMgr *datapath.Manager - expectedProcessed bool - expected *velerov2alpha1api.DataUpload - checkFunc func(velerov2alpha1api.DataUpload) bool - expectedRequeue ctrl.Result - expectedErrMsg string - needErrs []bool - removeNode bool - ambiguousNodeOS bool - peekErr error - notCreateFSBR bool - fsBRInitErr error - fsBRStartErr error + name string + du *velerov2alpha1api.DataUpload + notCreateDU bool + needDelete bool + sportTime *metav1.Time + pod *corev1api.Pod + pvc *corev1api.PersistentVolumeClaim + snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer + dataMgr *datapath.Manager + needCreateFSBR bool + needExclusiveUpdateError error + expected *velerov2alpha1api.DataUpload + expectDeleted bool + expectCancelRecord bool + needErrs []bool + ambiguousNodeOS bool + peekErr error + exposeErr error + getExposeErr error + getExposeNil bool + fsBRInitErr error + fsBRStartErr error + expectedErr string + expectedResult *ctrl.Result + expectDataPath bool }{ { - name: "Dataupload is not initialized", - du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), - expectedRequeue: ctrl.Result{}, + name: "du not found", + du: dataUploadBuilder().Result(), + notCreateDU: true, }, { - name: "Error get Dataupload", - du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "getting DataUpload: Get error", - needErrs: []bool{true, false, false, false}, + name: "du not created in velero default namespace", + du: builder.ForDataUpload("test-ns", dataUploadName).Result(), }, { - name: "Unsupported data mover type", - du: dataUploadBuilder().DataMover("unknown type").Result(), - expected: dataUploadBuilder().Phase("").Result(), - expectedRequeue: ctrl.Result{}, + name: "get du fail", + du: dataUploadBuilder().Result(), + needErrs: []bool{true, false, false, false}, + expectedErr: "getting DataUpload: Get error", }, { - name: "Unknown type of snapshot exposer is not initialized", - du: dataUploadBuilder().SnapshotType("unknown type").Result(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "unknown type type of snapshot exposer is not exist", + name: "du is not for built-in dm", + du: dataUploadBuilder().DataMover("other").Result(), }, { - name: "Dataupload should be accepted", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1api.Volume{Name: "test-pvc"}).Result(), - pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - expectedRequeue: ctrl.Result{}, + name: "add finalizer to du", + du: dataUploadBuilder().Result(), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "Dataupload should fail to get PVC information", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1api.Volume{Name: "wrong-pvc"}).Result(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "failed to get PVC", + name: "add finalizer to du failed", + du: dataUploadBuilder().Result(), + needErrs: []bool{false, false, true, false}, + expectedErr: "error updating dataupload with error velero/dataupload-1: Update error", }, { - name: "Dataupload should fail because expected node doesn't exist", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1api.Volume{Name: "test-pvc"}).Result(), - pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), - removeNode: true, - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "no appropriate node to run data upload", + name: "du is under deletion", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + needDelete: true, + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), }, { - name: "Dataupload should be prepared", - du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{}, + name: "du is under deletion but cancel failed", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + needErrs: []bool{false, false, true, false}, + needDelete: true, + expectedErr: "error updating dataupload with error velero/dataupload-1: Update error", }, { - name: "Dataupload prepared should be completed", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, + name: "du is under deletion and in terminal state", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + sportTime: &metav1.Time{Time: time.Now()}, + needDelete: true, + expectDeleted: true, }, { - name: "Dataupload should fail if expose returns ambiguous nodeOS", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - ambiguousNodeOS: true, - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), - expectedErrMsg: "unsupported ambiguous node OS", + name: "du is under deletion and in terminal state, but remove finalizer failed", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + needErrs: []bool{false, false, true, false}, + needDelete: true, + expectedErr: "error updating dataupload with error velero/dataupload-1: Update error", }, { - name: "Dataupload with not enabled cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, + name: "delay cancel negative for others", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + sportTime: &metav1.Time{Time: time.Now()}, + expectCancelRecord: true, }, { - name: "Dataupload should be cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), - expectedRequeue: ctrl.Result{}, + name: "delay cancel negative for inProgress", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)}, + expectCancelRecord: true, }, { - name: "Dataupload should be cancel with match node", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: func() *velerov2alpha1api.DataUpload { - du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result() - du.Status.Node = "test-node" - return du - }(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), - expectedRequeue: ctrl.Result{}, - notCreateFSBR: true, + name: "delay cancel affirmative for others", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)}, + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), }, { - name: "Dataupload should not be cancel with mismatch node", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: func() *velerov2alpha1api.DataUpload { - du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result() - du.Status.Node = "different_node" - return du - }(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, - notCreateFSBR: true, + name: "delay cancel affirmative for inProgress", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), }, { - name: "runCancelableDataUpload is concurrent limited", - dataMgr: datapath.NewManager(0), - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + name: "delay cancel failed", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + needErrs: []bool{false, false, true, false}, + sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectCancelRecord: true, }, { - name: "data path init error", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - fsBRInitErr: errors.New("fake-data-path-init-error"), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), - expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error", + name: "Unknown data upload status", + du: dataUploadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, { - name: "Unable to update status to in progress for data download", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - needErrs: []bool{false, false, false, true}, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + name: "Unknown type of snapshot exposer is not initialized", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).SnapshotType("unknown type").Result(), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + expectedErr: "unknown type type of snapshot exposer is not exist", }, { - name: "data path start error", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - fsBRStartErr: errors.New("fake-data-path-start-error"), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), - expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error", + name: "new du but accept failed", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + needExclusiveUpdateError: errors.New("exclusive-update-error"), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedErr: "error accepting the data upload dataupload-1: exclusive-update-error", }, { - name: "prepare timeout", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + name: "du is cancel on accepted", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), }, { - name: "peek error", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result(), - peekErr: errors.New("fake-peek-error"), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), + name: "du is accepted but setup expose param failed on getting PVC", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Message("failed to set exposer parameters").Result(), + expectedErr: "failed to get PVC fake-ns/test-pvc: persistentvolumeclaims \"test-pvc\" not found", }, { - name: "Dataupload with enabled cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: func() *velerov2alpha1api.DataUpload { - du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result() - controllerutil.AddFinalizer(du, DataUploadDownloadFinalizer) - du.DeletionTimestamp = &metav1.Time{Time: time.Now()} - return du - }(), - checkFunc: func(du velerov2alpha1api.DataUpload) bool { - return du.Spec.Cancel - }, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - expectedRequeue: ctrl.Result{}, + name: "du expose failed", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).SnapshotType(fakeSnapshotType).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), + exposeErr: errors.New("fake-expose-error"), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Message("error exposing snapshot").Result(), + expectedErr: "fake-expose-error", }, { - name: "Dataupload with remove finalizer and should not be retrieved", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), - du: func() *velerov2alpha1api.DataUpload { - du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Cancel(true).Result() - controllerutil.AddFinalizer(du, DataUploadDownloadFinalizer) - du.DeletionTimestamp = &metav1.Time{Time: time.Now()} - return du - }(), - checkFunc: func(du velerov2alpha1api.DataUpload) bool { - return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer) - }, - expectedRequeue: ctrl.Result{}, + name: "du succeeds for accepted", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).SnapshotType(fakeSnapshotType).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + }, + { + name: "prepare timeout on accepted", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Finalizers([]string{DataUploadDownloadFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Message("timeout on preparing data upload").Result(), + }, + { + name: "peek error on accepted", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + peekErr: errors.New("fake-peak-error"), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Message("found a du velero/dataupload-1 with expose error: fake-peak-error. mark it as cancel").Result(), + }, + { + name: "cancel on prepared", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Cancel(true).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), + }, + { + name: "Failed to get snapshot expose on prepared", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + getExposeErr: errors.New("fake-get-error"), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("exposed snapshot is not ready: fake-get-error").Result(), + expectedErr: "fake-get-error", + }, + { + name: "Get nil restore expose on prepared", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + getExposeNil: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("exposed snapshot is not ready").Result(), + expectedErr: "no expose result is available for the current node", + }, + { + name: "Dataupload should fail if expose returns ambiguous nodeOS", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + ambiguousNodeOS: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedErr: "unsupported ambiguous node OS", + }, + { + name: "Error in data path is concurrent limited", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + dataMgr: datapath.NewManager(0), + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path init error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + fsBRInitErr: errors.New("fake-data-path-init-error"), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error initializing data path").Result(), + expectedErr: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data upload", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + needErrs: []bool{false, false, true, false}, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + fsBRStartErr: errors.New("fake-data-path-start-error"), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error starting data path").Result(), + expectedErr: "error starting async backup for pod , volume dataupload-1: fake-data-path-start-error", + }, + { + name: "Prepare succeeds", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectDataPath: true, + }, + { + name: "In progress du is not handled by the current node", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + }, + { + name: "In progress du is not set as cancel", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + }, + { + name: "Cancel data upload in progress with empty FSBR", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + }, + { + name: "Cancel data upload in progress and patch data upload error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + needErrs: []bool{false, false, true, false}, + needCreateFSBR: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedErr: "error updating dataupload with error velero/dataupload-1: Update error", + expectCancelRecord: true, + expectDataPath: true, + }, + { + name: "Cancel data upload in progress succeeds", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(), + needCreateFSBR: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectDataPath: true, + expectCancelRecord: true, }, } @@ -561,24 +606,15 @@ func TestReconcile(t *testing.T) { t.Run(test.name, func(t *testing.T) { r, err := initDataUploaderReconciler(test.needErrs...) require.NoError(t, err) - defer func() { - r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) - if test.pod != nil { - r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) - } - }() - ctx := context.Background() - if test.du.Namespace == velerov1api.DefaultNamespace { - isDeletionTimestampSet := test.du.DeletionTimestamp != nil - err = r.client.Create(ctx, test.du) + + if !test.notCreateDU { + err = r.client.Create(context.Background(), test.du) + require.NoError(t, err) + } + + if test.needDelete { + err = r.client.Delete(context.Background(), test.du) require.NoError(t, err) - // because of the changes introduced by https://github.com/kubernetes-sigs/controller-runtime/commit/7a66d580c0c53504f5b509b45e9300cc18a1cc30 - // the fake client ignores the DeletionTimestamp when calling the Create(), - // so call Delete() here - if isDeletionTimestampSet { - err = r.client.Delete(ctx, test.du) - require.NoError(t, err) - } } if test.pod != nil { @@ -591,38 +627,41 @@ func TestReconcile(t *testing.T) { require.NoError(t, err) } - if test.removeNode { - err = r.kubeClient.CoreV1().Nodes().Delete(ctx, "fake-node", metav1.DeleteOptions{}) - require.NoError(t, err) - } - if test.dataMgr != nil { r.dataPathMgr = test.dataMgr } else { r.dataPathMgr = datapath.NewManager(1) } + if test.sportTime != nil { + r.cancelledDataUpload[test.du.Name] = test.sportTime.Time + } + if test.du.Spec.SnapshotType == fakeSnapshotType { - r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr}} + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr, test.exposeErr, test.getExposeErr, test.getExposeNil}} } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} } - if !test.notCreateFSBR { - datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { - return &fakeDataUploadFSBR{ - du: test.du, - kubeClient: r.client, - clock: r.Clock, - initErr: test.fsBRInitErr, - startErr: test.fsBRStartErr, - } + + funcExclusiveUpdateDataUpload = exclusiveUpdateDataUpload + if test.needExclusiveUpdateError != nil { + funcExclusiveUpdateDataUpload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataUpload, func(*velerov2alpha1api.DataUpload)) (bool, error) { + return false, test.needExclusiveUpdateError } } - testCreateFsBR := false - if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { + datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return &fakeDataUploadFSBR{ + du: test.du, + kubeClient: r.client, + clock: r.Clock, + initErr: test.fsBRInitErr, + startErr: test.fsBRStartErr, + } + } + + if test.needCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { - testCreateFsBR = true _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } @@ -635,41 +674,46 @@ func TestReconcile(t *testing.T) { }, }) - assert.Equal(t, test.expectedRequeue, actualResult) - if test.expectedErrMsg == "" { - require.NoError(t, err) + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) } else { - require.ErrorContains(t, err, test.expectedErrMsg) + assert.NoError(t, err) } - du := velerov2alpha1api.DataUpload{} - err = r.client.Get(ctx, kbclient.ObjectKey{ - Name: test.du.Name, - Namespace: test.du.Namespace, - }, &du) - t.Logf("%s: \n %v \n", test.name, du) - // Assertions - if test.expected == nil { - require.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, test.expected.Status.Phase, du.Status.Phase) + if test.expectedResult != nil { + assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue) + assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) } - if test.expectedProcessed { - assert.False(t, du.Status.CompletionTimestamp.IsZero()) + if test.expected != nil || test.expectDeleted { + du := velerov2alpha1api.DataUpload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.du.Name, + Namespace: test.du.Namespace, + }, &du) + + if test.expectDeleted { + assert.True(t, apierrors.IsNotFound(err)) + } else { + require.NoError(t, err) + + assert.Equal(t, test.expected.Status.Phase, du.Status.Phase) + assert.Contains(t, du.Status.Message, test.expected.Status.Message) + assert.Equal(t, du.Finalizers, test.expected.Finalizers) + assert.Equal(t, du.Spec.Cancel, test.expected.Spec.Cancel) + } } - if !test.expectedProcessed { - assert.True(t, du.Status.CompletionTimestamp.IsZero()) - } - - if test.checkFunc != nil { - assert.True(t, test.checkFunc(du)) - } - - if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { + if !test.expectDataPath { assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) + } else { + assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) + } + + if test.expectCancelRecord { + assert.Contains(t, r.cancelledDataUpload, test.du.Name) + } else { + assert.Empty(t, r.cancelledDataUpload) } }) } @@ -719,7 +763,7 @@ func TestOnDataUploadProgress(t *testing.T) { { name: "failed to patch dataupload", du: dataUploadBuilder().Result(), - needErrs: []bool{false, false, false, true}, + needErrs: []bool{false, false, true, false}, }, } for _, test := range tests { @@ -992,7 +1036,7 @@ func TestTryCancelDataUpload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.tryCancelAcceptedDataUpload(ctx, test.dd, "") + r.tryCancelDataUpload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) @@ -1114,33 +1158,8 @@ func TestAttemptDataUploadResume(t *testing.T) { expectedError string }{ { - name: "accepted DataUpload in other node", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - cancelledDataUploads: []string{dataUploadName}, - acceptedDataUploads: []string{dataUploadName}, - }, - { - name: "accepted DataUpload in the current node", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).AcceptedByNode("node-1").Result(), - cancelledDataUploads: []string{dataUploadName}, - acceptedDataUploads: []string{dataUploadName}, - }, - { - name: "accepted DataUpload in the current node but canceled", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).AcceptedByNode("node-1").Cancel(true).Result(), - cancelledDataUploads: []string{dataUploadName}, - acceptedDataUploads: []string{dataUploadName}, - }, - { - name: "accepted DataUpload in the current node but update error", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).AcceptedByNode("node-1").Result(), - needErrs: []bool{false, false, true, false, false, false}, - acceptedDataUploads: []string{dataUploadName}, - }, - { - name: "prepared DataUpload", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - prepareddDataUploads: []string{dataUploadName}, + name: "Other DataUpload", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), }, { name: "InProgress DataUpload, not the current node", From b222b88c942652ef78f52f6ffc36de571de0e4ae Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 22 May 2025 14:56:15 +0800 Subject: [PATCH 2/2] dm controller refactor for cancel Signed-off-by: Lyndon-Li --- pkg/controller/data_download_controller.go | 40 +++++++++++------ pkg/controller/data_upload_controller.go | 50 ++++++++++++++-------- 2 files changed, 57 insertions(+), 33 deletions(-) diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index e6b3f8872..53d83d98e 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -434,10 +434,16 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na log.Info("Cleaning up exposed environment") r.restoreExposer.CleanUp(ctx, objRef) - original := dd.DeepCopy() - dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted - dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool { + if isDataDownloadInFinalState(dd) { + return false + } + + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + + return true + }); err != nil { log.WithError(err).Error("error updating data download status") } else { log.Infof("Data download is marked as %s", dd.Status.Phase) @@ -470,22 +476,28 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na var dd velerov2alpha1api.DataDownload if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get datadownload on cancel") - } else { - // cleans up any objects generated during the snapshot expose - r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd)) + return + } + // cleans up any objects generated during the snapshot expose + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd)) + + if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool { + if isDataDownloadInFinalState(dd) { + return false + } - original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled if dd.Status.StartTimestamp.IsZero() { dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating data download status") - } else { - r.metrics.RegisterDataDownloadCancel(r.nodeName) - delete(r.cancelledDataDownload, dd.Name) - } + + return true + }); err != nil { + log.WithError(err).Error("error updating data download status") + } else { + r.metrics.RegisterDataDownloadCancel(r.nodeName) + delete(r.cancelledDataDownload, dd.Name) } } diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index f9aa2fad4..6ab65f172 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -468,16 +468,21 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } // Update status to Completed with path & snapshot ID. - original := du.DeepCopy() - du.Status.Path = result.Backup.Source.ByPath - du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted - du.Status.SnapshotID = result.Backup.SnapshotID - du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if result.Backup.EmptySnapshot { - du.Status.Message = "volume was empty so no data was upload" - } + if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool { + if isDataUploadInFinalState(du) { + return false + } - if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + du.Status.Path = result.Backup.Source.ByPath + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted + du.Status.SnapshotID = result.Backup.SnapshotID + du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + if result.Backup.EmptySnapshot { + du.Status.Message = "volume was empty so no data was upload" + } + + return true + }); err != nil { log.WithError(err).Error("error updating DataUpload status") } else { log.Info("Data upload completed") @@ -510,21 +515,28 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp du := &velerov2alpha1api.DataUpload{} if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on cancel") - } else { - // cleans up any objects generated during the snapshot expose - r.cleanUp(ctx, du, log) - original := du.DeepCopy() + return + } + // cleans up any objects generated during the snapshot expose + r.cleanUp(ctx, du, log) + + if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool { + if isDataUploadInFinalState(du) { + return false + } + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if du.Status.StartTimestamp.IsZero() { du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating DataUpload status") - } else { - r.metrics.RegisterDataUploadCancel(r.nodeName) - delete(r.cancelledDataUpload, du.Name) - } + + return true + }); err != nil { + log.WithError(err).Error("error updating DataUpload status") + } else { + r.metrics.RegisterDataUploadCancel(r.nodeName) + delete(r.cancelledDataUpload, du.Name) } }