Merge pull request #8952 from Lyndon-Li/dm-controller-refactor-for-cancel

DM controller refactor for cancel
pull/8968/head
lyndon-li 2025-05-28 15:09:33 +08:00 committed by GitHub
commit 7db3eeac58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1094 additions and 890 deletions

View File

@ -0,0 +1 @@
Fix issue #8534, refactor dm controllers to tolerate cancel request in more cases, e.g., node restart, node drain

View File

@ -165,3 +165,15 @@ func (d *DataDownloadBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time)
d.object.Status.AcceptedTimestamp = acceptedTimestamp d.object.Status.AcceptedTimestamp = acceptedTimestamp
return d return d
} }
// Finalizers sets the DataDownload's Finalizers.
func (d *DataDownloadBuilder) Finalizers(finalizers []string) *DataDownloadBuilder {
d.object.Finalizers = finalizers
return d
}
// Message sets the DataDownload's Message.
func (d *DataDownloadBuilder) Message(msg string) *DataDownloadBuilder {
d.object.Status.Message = msg
return d
}

View File

@ -168,3 +168,15 @@ func (d *DataUploadBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) *D
d.object.Status.AcceptedTimestamp = acceptedTimestamp d.object.Status.AcceptedTimestamp = acceptedTimestamp
return d return d
} }
// Finalizers sets the DataUpload's Finalizers.
func (d *DataUploadBuilder) Finalizers(finalizers []string) *DataUploadBuilder {
d.object.Finalizers = finalizers
return d
}
// Message sets the DataUpload's Message.
func (d *DataUploadBuilder) Message(msg string) *DataUploadBuilder {
d.object.Status.Message = msg
return d
}

View File

@ -56,36 +56,38 @@ import (
// DataDownloadReconciler reconciles a DataDownload object // DataDownloadReconciler reconciles a DataDownload object
type DataDownloadReconciler struct { type DataDownloadReconciler struct {
client client.Client client client.Client
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
mgr manager.Manager mgr manager.Manager
logger logrus.FieldLogger logger logrus.FieldLogger
Clock clock.WithTickerAndDelayedExecution Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer restoreExposer exposer.GenericRestoreExposer
nodeName string nodeName string
dataPathMgr *datapath.Manager dataPathMgr *datapath.Manager
restorePVCConfig nodeagent.RestorePVC restorePVCConfig nodeagent.RestorePVC
podResources corev1api.ResourceRequirements podResources corev1api.ResourceRequirements
preparingTimeout time.Duration preparingTimeout time.Duration
metrics *metrics.ServerMetrics metrics *metrics.ServerMetrics
cancelledDataDownload map[string]time.Time
} }
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
restorePVCConfig nodeagent.RestorePVC, podResources corev1api.ResourceRequirements, nodeName string, preparingTimeout time.Duration, restorePVCConfig nodeagent.RestorePVC, podResources corev1api.ResourceRequirements, nodeName string, preparingTimeout time.Duration,
logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{ return &DataDownloadReconciler{
client: client, client: client,
kubeClient: kubeClient, kubeClient: kubeClient,
mgr: mgr, mgr: mgr,
logger: logger.WithField("controller", "DataDownload"), logger: logger.WithField("controller", "DataDownload"),
Clock: &clock.RealClock{}, Clock: &clock.RealClock{},
nodeName: nodeName, nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig, restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr, dataPathMgr: dataPathMgr,
podResources: podResources, podResources: podResources,
preparingTimeout: preparingTimeout, preparingTimeout: preparingTimeout,
metrics: metrics, metrics: metrics,
cancelledDataDownload: make(map[string]time.Time),
} }
} }
@ -118,42 +120,90 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
if r.restoreExposer == nil {
return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log)
}
// Add finalizer
// Logic for clear resources when datadownload been deleted // Logic for clear resources when datadownload been deleted
if dd.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning if !isDataDownloadInFinalState(dd) {
if !isDataDownloadInFinalState(dd) && !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) { if !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dd *velerov2alpha1api.DataDownload) bool {
if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
return false
}
controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer) controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer)
})
if err != nil { return true
log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) }); err != nil {
log.WithError(err).Errorf("failed to add finalizer for dd %s/%s", dd.Namespace, dd.Name)
return ctrl.Result{}, err return ctrl.Result{}, err
} else if !succeeded { }
log.Warnf("failed to add finalizer for %s/%s and will requeue later", dd.Namespace, dd.Name)
return ctrl.Result{Requeue: true}, nil return ctrl.Result{}, nil
}
if !dd.DeletionTimestamp.IsZero() {
if !dd.Spec.Cancel {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase)
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = "Cancel datadownload because it is being deleted"
return true
}); err != nil {
log.WithError(err).Errorf("failed to set cancel flag for dd %s/%s", dd.Namespace, dd.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} }
} }
} else if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) { } else {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism delete(r.cancelledDataDownload, dd.Name)
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase)
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool { // put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
if dataDownload.Spec.Cancel { // instead of intermediate state.
return false // remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
// also in final status cr won't block the direct delete of the velero namespace
if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dd *velerov2alpha1api.DataDownload) bool {
if !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
return false
}
controllerutil.RemoveFinalizer(dd, DataUploadDownloadFinalizer)
return true
}); err != nil {
log.WithError(err).Error("error to remove finalizer")
return ctrl.Result{}, err
} }
dataDownload.Spec.Cancel = true return ctrl.Result{}, nil
dataDownload.Status.Message = "Cancel datadownload because it is being deleted" }
}
return true if dd.Spec.Cancel {
}); err != nil { if spotted, found := r.cancelledDataDownload[dd.Name]; !found {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) r.cancelledDataDownload[dd.Name] = r.Clock.Now()
return ctrl.Result{}, err } else {
delay := cancelDelayOthers
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
delay = cancelDelayInProgress
}
if time.Since(spotted) > delay {
log.Infof("Data download %s is canceled in Phase %s but not handled in rasonable time", dd.GetName(), dd.Status.Phase)
if r.tryCancelDataDownload(ctx, dd, "") {
delete(r.cancelledDataDownload, dd.Name)
}
return ctrl.Result{}, nil
}
} }
} }
@ -167,7 +217,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
accepted, err := r.acceptDataDownload(ctx, dd) accepted, err := r.acceptDataDownload(ctx, dd)
if err != nil { if err != nil {
return r.errorOut(ctx, dd, err, "error to accept the data download", log) return ctrl.Result{}, errors.Wrapf(err, "error accepting the data download %s", dd.Name)
} }
if !accepted { if !accepted {
@ -193,44 +243,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// And then only the controller who is in the same node could do the rest work. // And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposeParam) err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposeParam)
if err != nil { if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
}
}
if isDataDownloadInFinalState(dd) {
log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
return ctrl.Result{}, nil
} else {
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
} }
log.Info("Restore is exposed") log.Info("Restore is exposed")
// we need to get CR again for it may canceled by datadownload controller on other
// nodes when doing expose action, if detectd cancel action we need to clear up the internal
// resources created by velero during backup.
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find datadownload")
return ctrl.Result{}, nil
}
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")
}
// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataDownloadInFinalState(dd) {
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel { if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) r.tryCancelDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.AcceptedTimestamp != nil { } else if dd.Status.AcceptedTimestamp != nil {
if time.Since(dd.Status.AcceptedTimestamp.Time) >= r.preparingTimeout { if time.Since(dd.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
@ -240,7 +260,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared") log.Infof("Data download is prepared and should be processed by %s (%s)", dd.Status.Node, r.nodeName)
if dd.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
if dd.Spec.Cancel { if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
@ -259,13 +283,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err != nil { if err != nil {
return r.errorOut(ctx, dd, err, "restore exposer is not ready", log) return r.errorOut(ctx, dd, err, "restore exposer is not ready", log)
} else if result == nil { } else if result == nil {
log.Debug("Get empty restore exposer") return r.errorOut(ctx, dd, errors.New("no expose result is available for the current node"), "exposed snapshot is not ready", log)
return ctrl.Result{}, nil
} }
log.Info("Restore PVC is ready and creating data path routine") log.Info("Restore PVC is ready and creating data path routine")
// Need to first create file system BR and get data path instance then update data upload status // Need to first create file system BR and get data path instance then update data download status
callbacks := datapath.Callbacks{ callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted, OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed, OnFailed: r.OnDataDownloadFailed,
@ -292,16 +315,30 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
} }
// Update status to InProgress // Update status to InProgress
original := dd.DeepCopy() terminated := false
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if isDataDownloadInFinalState(dd) {
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { terminated = true
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name) return false
}
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will data path close and retry", dd.Name)
r.closeDataPath(ctx, dd.Name) r.closeDataPath(ctx, dd.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} }
if terminated {
log.Warnf("datadownload %s is terminated during transition from prepared", dd.Name)
r.closeDataPath(ctx, dd.Name)
return ctrl.Result{}, nil
}
log.Info("Data download is marked as in progress") log.Info("Data download is marked as in progress")
if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil { if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil {
@ -313,45 +350,41 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel { if dd.Spec.Cancel {
log.Info("Data download is being canceled") if dd.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
log.Info("In progress data download is being canceled")
asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name) asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name)
if asyncBR == nil { if asyncBR == nil {
if r.nodeName == dd.Status.Node { r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else {
log.Info("Data path is not started in this node and will not canceled by current node")
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
// Update status to Canceling. // Update status to Canceling.
original := dd.DeepCopy() if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling if isDataDownloadInFinalState(dd) {
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { log.Warnf("datadownload %s is terminated, abort setting it to canceling", dd.Name)
log.WithError(err).Error("error updating data download status") return false
}
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling
return true
}); err != nil {
log.WithError(err).Error("error updating data download into canceling status")
return ctrl.Result{}, err return ctrl.Result{}, err
} }
asyncBR.Cancel() asyncBR.Cancel()
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
return ctrl.Result{}, nil
} else {
// put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state
// remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
// also in final status cr won't block the direct delete of the velero namespace
if isDataDownloadInFinalState(dd) && controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
original := dd.DeepCopy()
controllerutil.RemoveFinalizer(dd, DataUploadDownloadFinalizer)
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error to remove finalizer")
}
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
return ctrl.Result{}, nil
} }
func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
@ -401,10 +434,16 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
log.Info("Cleaning up exposed environment") log.Info("Cleaning up exposed environment")
r.restoreExposer.CleanUp(ctx, objRef) r.restoreExposer.CleanUp(ctx, objRef)
original := dd.DeepCopy() if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted if isDataDownloadInFinalState(dd) {
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} return false
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { }
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
return true
}); err != nil {
log.WithError(err).Error("error updating data download status") log.WithError(err).Error("error updating data download status")
} else { } else {
log.Infof("Data download is marked as %s", dd.Status.Phase) log.Infof("Data download is marked as %s", dd.Status.Phase)
@ -437,29 +476,34 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
var dd velerov2alpha1api.DataDownload var dd velerov2alpha1api.DataDownload
if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil {
log.WithError(getErr).Warn("Failed to get datadownload on cancel") log.WithError(getErr).Warn("Failed to get datadownload on cancel")
} else { return
// cleans up any objects generated during the snapshot expose }
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd)) // cleans up any objects generated during the snapshot expose
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd))
if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
if isDataDownloadInFinalState(dd) {
return false
}
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() { if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
} }
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status") return true
} else { }); err != nil {
r.metrics.RegisterDataDownloadCancel(r.nodeName) log.WithError(err).Error("error updating data download status")
} } else {
r.metrics.RegisterDataDownloadCancel(r.nodeName)
delete(r.cancelledDataDownload, dd.Name)
} }
} }
func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) bool {
log := r.logger.WithField("datadownload", dd.Name) log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Accepted data download is canceled") succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dataDownload.Status.StartTimestamp.IsZero() { if dataDownload.Status.StartTimestamp.IsZero() {
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
@ -473,31 +517,29 @@ func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Conte
if err != nil { if err != nil {
log.WithError(err).Error("error updating datadownload status") log.WithError(err).Error("error updating datadownload status")
return return false
} else if !succeeded { } else if !succeeded {
log.Warn("conflict in updating datadownload status and will try it again later") log.Warn("conflict in updating datadownload status and will try it again later")
return return false
} }
// success update // success update
r.metrics.RegisterDataDownloadCancel(r.nodeName) r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
log.Warn("data download is canceled")
return true
} }
func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
log := r.logger.WithField("datadownload", ddName) log := r.logger.WithField("datadownload", ddName)
var dd velerov2alpha1api.DataDownload if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: ddName}, log, func(dd *velerov2alpha1api.DataDownload) bool {
if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil { dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
log.WithError(err).Warn("Failed to get data download on progress") return true
return }); err != nil {
} log.WithError(err).Error("Failed to update progress")
original := dd.DeepCopy()
dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Failed to update restore snapshot progress")
} }
} }
@ -509,7 +551,19 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
gp := kube.NewGenericEventPredicate(func(object client.Object) bool { gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
dd := object.(*velerov2alpha1api.DataDownload) dd := object.(*velerov2alpha1api.DataDownload)
return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted) if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
return true
}
if dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
return true
}
if isDataDownloadInFinalState(dd) && !dd.DeletionTimestamp.IsZero() {
return true
}
return false
}) })
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataDownload), r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{ s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataDownload), r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{
Predicates: []predicate.Predicate{gp}, Predicates: []predicate.Predicate{gp},
@ -568,10 +622,17 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
if pod.Status.Phase == corev1api.PodRunning { if pod.Status.Phase == corev1api.PodRunning {
log.Info("Preparing data download") log.Info("Preparing data download")
// we don't expect anyone else update the CR during the Prepare process if err = UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log,
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload) func(dd *velerov2alpha1api.DataDownload) bool {
if err != nil || !updated { if isDataDownloadInFinalState(dd) {
log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload") log.Warnf("datadownload %s is terminated, abort setting it to prepared", dd.Name)
return false
}
r.prepareDataDownload(dd)
return true
}); err != nil {
log.WithError(err).Warn("failed to update dataudownload, prepare will halt for this dataudownload")
return []reconcile.Request{} return []reconcile.Request{}
} }
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
@ -618,13 +679,19 @@ func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha
} }
func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error { func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error {
log.Infof("update data download status to %v", dd.Status.Phase) log.Info("update data download status to Failed")
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil { if patchErr := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
if isDataDownloadInFinalState(dd) {
return false
}
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
return true
}); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status") log.WithError(patchErr).Error("error updating DataDownload status")
} else { } else {
r.metrics.RegisterDataDownloadFailure(r.nodeName) r.metrics.RegisterDataDownloadFailure(r.nodeName)
@ -647,7 +714,7 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()} datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}
} }
succeeded, err := r.exclusiveUpdateDataDownload(ctx, updated, updateFunc) succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, updated, updateFunc)
if err != nil { if err != nil {
return false, err return false, err
@ -667,7 +734,7 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
log := r.logger.WithField("DataDownload", dd.Name) log := r.logger.WithField("DataDownload", dd.Name)
log.Info("Timeout happened for preparing datadownload") log.Info("Timeout happened for preparing datadownload")
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download" dd.Status.Message = "timeout on preparing data download"
}) })
@ -678,7 +745,7 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
} }
if !succeeded { if !succeeded {
log.Warn("Dataupload has been updated by others") log.Warn("Datadownload has been updated by others")
return return
} }
@ -689,16 +756,18 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
log.Info("Dataupload has been cleaned up") log.Info("Datadownload has been cleaned up")
r.metrics.RegisterDataDownloadFailure(r.nodeName) r.metrics.RegisterDataDownloadFailure(r.nodeName)
} }
func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, var funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload
func exclusiveUpdateDataDownload(ctx context.Context, cli client.Client, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) { updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updateFunc(dd) updateFunc(dd)
err := r.client.Update(ctx, dd) err := cli.Update(ctx, dd)
if err == nil { if err == nil {
return true, nil return true, nil
@ -805,7 +874,7 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted
} }
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataDownload) bool) error { func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov2alpha1api.DataDownload) bool) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
dd := &velerov2alpha1api.DataDownload{} dd := &velerov2alpha1api.DataDownload{}
if err := client.Get(ctx, namespacedName, dd); err != nil { if err := client.Get(ctx, namespacedName, dd); err != nil {
@ -839,11 +908,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
for i := range dataDownloads.Items { for i := range dataDownloads.Items {
dd := &dataDownloads.Items[i] dd := &dataDownloads.Items[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
if dd.Status.Node != r.nodeName { if dd.Status.Node != r.nodeName {
logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node) logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node)
continue continue
@ -870,25 +935,12 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
return true return true
}) })
if err != nil { if err != nil {
logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger datadownload cancel")
}
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase")
err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = "Datadownload is in Accepted status during the node-agent starting, mark it as cancel"
return true
})
if err != nil {
r.logger.WithField("datadownload", dd.GetName()).WithError(err).Errorf("Failed to trigger dataupload cancel")
} }
} else {
// the Prepared CR could be still handled by datadownload controller after node-agent restart
// the accepted CR may also suvived from node-agent restart as long as the intermediate objects are all done
logger.WithField("datadownload", dd.GetName()).Infof("find a datadownload with status %s", dd.Status.Phase)
} }
} }
@ -914,7 +966,7 @@ func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context,
OnProgress: r.OnDataDownloadProgress, OnProgress: r.OnDataDownloadProgress,
} }
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log) asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log)
if err != nil { if err != nil {
return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name) return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name)
} }

View File

@ -37,7 +37,6 @@ import (
clientgofake "k8s.io/client-go/kubernetes/fake" clientgofake "k8s.io/client-go/kubernetes/fake"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client" kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -68,7 +67,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
PV: "test-pv", PV: "test-pv",
PVC: "test-pvc", PVC: "test-pvc",
Namespace: "test-ns", Namespace: "test-ns",
}).NodeOS(velerov2alpha1api.NodeOS("linux")) })
} }
func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
@ -171,89 +170,204 @@ func TestDataDownloadReconcile(t *testing.T) {
node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result() node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result()
tests := []struct { tests := []struct {
name string name string
dd *velerov2alpha1api.DataDownload dd *velerov2alpha1api.DataDownload
targetPVC *corev1api.PersistentVolumeClaim notCreateDD bool
dataMgr *datapath.Manager targetPVC *corev1api.PersistentVolumeClaim
needErrs []bool dataMgr *datapath.Manager
needCreateFSBR bool needErrs []bool
isExposeErr bool needCreateFSBR bool
isGetExposeErr bool needDelete bool
isPeekExposeErr bool sportTime *metav1.Time
isNilExposer bool isExposeErr bool
isFSBRInitErr bool isGetExposeErr bool
isFSBRRestoreErr bool isGetExposeNil bool
notNilExpose bool isPeekExposeErr bool
notMockCleanUp bool isNilExposer bool
mockInit bool notNilExpose bool
mockInitErr error notMockCleanUp bool
mockStart bool mockInit bool
mockStartErr error mockInitErr error
mockCancel bool mockStart bool
mockClose bool mockStartErr error
expected *velerov2alpha1api.DataDownload mockCancel bool
expectedStatusMsg string mockClose bool
checkFunc func(du velerov2alpha1api.DataDownload) bool needExclusiveUpdateError error
expectedResult *ctrl.Result expected *velerov2alpha1api.DataDownload
expectDeleted bool
expectCancelRecord bool
expectedResult *ctrl.Result
expectedErr string
expectDataPath bool
}{ }{
{ {
name: "Unknown data download status", name: "dd not found",
dd: dataDownloadBuilder().Phase("Unknown").Cancel(true).Result(), dd: dataDownloadBuilder().Result(),
notCreateDD: true,
},
{
name: "dd not created in velero default namespace",
dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(),
},
{
name: "get dd fail",
dd: dataDownloadBuilder().Result(),
needErrs: []bool{true, false, false, false},
expectedErr: "Get error",
},
{
name: "dd is not for built-in dm",
dd: dataDownloadBuilder().DataMover("other").Result(),
},
{
name: "add finalizer to dd",
dd: dataDownloadBuilder().Result(),
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "add finalizer to dd failed",
dd: dataDownloadBuilder().Result(),
needErrs: []bool{false, false, true, false},
expectedErr: "error updating datadownload velero/datadownload-1: Update error",
},
{
name: "dd is under deletion",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
needDelete: true,
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
},
{
name: "dd is under deletion but cancel failed",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
needErrs: []bool{false, false, true, false},
needDelete: true,
expectedErr: "error updating datadownload velero/datadownload-1: Update error",
},
{
name: "dd is under deletion and in terminal state",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(),
sportTime: &metav1.Time{Time: time.Now()},
needDelete: true,
expectDeleted: true,
},
{
name: "dd is under deletion and in terminal state, but remove finalizer failed",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(),
needErrs: []bool{false, false, true, false},
needDelete: true,
expectedErr: "error updating datadownload velero/datadownload-1: Update error",
},
{
name: "delay cancel negative for others",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
sportTime: &metav1.Time{Time: time.Now()},
expectCancelRecord: true,
},
{
name: "delay cancel negative for inProgress",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)},
expectCancelRecord: true,
},
{
name: "delay cancel affirmative for others",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)},
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "delay cancel affirmative for inProgress",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "delay cancel failed",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
needErrs: []bool{false, false, true, false},
sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
expectCancelRecord: true,
},
{
name: "Unknown data download status",
dd: dataDownloadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "new dd but no target PVC",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true},
},
{
name: "new dd but accept failed",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needExclusiveUpdateError: errors.New("exclusive-update-error"),
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "error accepting the data download datadownload-1: exclusive-update-error",
},
{
name: "dd is cancel on accepted",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
}, },
{ {
name: "Cancel data downloand in progress and patch data download error", name: "dd is accepted but setup expose param failed",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(), dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, false, true}, expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Phase(velerov2alpha1api.DataDownloadPhaseFailed).Message("failed to set exposer parameters").Result(),
needCreateFSBR: true, expectedErr: "no appropriate node to run datadownload velero/datadownload-1: node with OS xxx doesn't exist",
expectedStatusMsg: "Patch error",
}, },
{ {
name: "Cancel data downloand in progress with empty FSBR", name: "dd expose failed",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(), dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
mockCancel: true, isExposeErr: true,
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Message("error to expose snapshot").Result(),
expectedErr: "Error to expose restore exposer",
}, },
{ {
name: "Cancel data downloand in progress with create FSBR", name: "dd succeeds for accepted",
dd: func() *velerov2alpha1api.DataDownload { dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result() targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
dd.Status.Node = "test-node" expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
return dd },
}(), {
name: "prepare timeout on accepted",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Finalizers([]string{DataUploadDownloadFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseFailed).Message("timeout on preparing data download").Result(),
},
{
name: "peek error on accepted",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
isPeekExposeErr: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Message("found a datadownload velero/datadownload-1 with expose error: fake-peek-error. mark it as cancel").Result(),
},
{
name: "cancel on prepared",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Cancel(true).Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "Failed to get restore expose on prepared",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needCreateFSBR: true, isGetExposeErr: true,
mockCancel: true, expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("restore exposer is not ready").Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Result(), expectedErr: "Error to get restore exposer",
}, },
{ {
name: "Cancel data downloand in progress without create FSBR", name: "Get nil restore expose on prepared",
dd: func() *velerov2alpha1api.DataDownload { dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
dd.Status.Node = "test-node"
return dd
}(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needCreateFSBR: false, isGetExposeNil: true,
mockCancel: true, expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("exposed snapshot is not ready").Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), expectedErr: "no expose result is available for the current node",
},
{
name: "Cancel data downloand in progress in different node",
dd: func() *velerov2alpha1api.DataDownload {
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
dd.Status.Node = "different-node"
return dd
}(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needCreateFSBR: false,
mockCancel: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
}, },
{ {
name: "Error in data path is concurrent limited", name: "Error in data path is concurrent limited",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
dataMgr: datapath.NewManager(0), dataMgr: datapath.NewManager(0),
notNilExpose: true, notNilExpose: true,
@ -261,158 +375,104 @@ func TestDataDownloadReconcile(t *testing.T) {
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
}, },
{ {
name: "data path init error", name: "data path init error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
mockInit: true, mockInit: true,
mockInitErr: errors.New("fake-data-path-init-error"), mockInitErr: errors.New("fake-data-path-init-error"),
mockClose: true, mockClose: true,
notNilExpose: true, notNilExpose: true,
expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error", expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error initializing data path").Result(),
expectedErr: "error initializing asyncBR: fake-data-path-init-error",
}, },
{ {
name: "Unable to update status to in progress for data download", name: "Unable to update status to in progress for data download",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, false, true}, needErrs: []bool{false, false, true, false},
mockInit: true, mockInit: true,
mockClose: true, mockClose: true,
notNilExpose: true, notNilExpose: true,
notMockCleanUp: true, notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
}, },
{ {
name: "data path start error", name: "data path start error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
mockInit: true, mockInit: true,
mockStart: true, mockStart: true,
mockStartErr: errors.New("fake-data-path-start-error"), mockStartErr: errors.New("fake-data-path-start-error"),
mockClose: true, mockClose: true,
notNilExpose: true, notNilExpose: true,
expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error starting data path").Result(),
expectedErr: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error",
}, },
{ {
name: "accept DataDownload error", name: "Prepare succeeds",
dd: dataDownloadBuilder().Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, true, false}, mockInit: true,
expectedStatusMsg: "Update error", mockStart: true,
notNilExpose: true,
notMockCleanUp: true,
expectDataPath: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
}, },
{ {
name: "Not create target pvc", name: "In progress dd is not handled by the current node",
dd: dataDownloadBuilder().Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
}, },
{ {
name: "Uninitialized dataDownload", name: "In progress dd is not set as cancel",
dd: dataDownloadBuilder().Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
isNilExposer: true,
expectedStatusMsg: "uninitialized generic exposer",
}, },
{ {
name: "DataDownload not created in velero default namespace", name: "Cancel data downloand in progress with empty FSBR",
dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
}, },
{ {
name: "Failed to get dataDownload", name: "Cancel data downloand in progress and patch data download error",
dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), needErrs: []bool{false, false, true, false},
needErrs: []bool{true, false, false, false}, needCreateFSBR: true,
expectedStatusMsg: "Get error", expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "error updating datadownload velero/datadownload-1: Update error",
expectCancelRecord: true,
expectDataPath: true,
}, },
{ {
name: "Unsupported dataDownload type", name: "Cancel data downloand in progress succeeds",
dd: dataDownloadBuilder().DataMover("Unsuppoorted type").Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), needCreateFSBR: true,
}, mockCancel: true,
{ expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
name: "Restore is exposed", expectDataPath: true,
dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSLinux).Result(), expectCancelRecord: true,
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
},
{
name: "Expected node doesn't exist",
dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSWindows).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectedStatusMsg: "no appropriate node to run datadownload",
},
{
name: "Get empty restore exposer",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
},
{
name: "Failed to get restore exposer",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectedStatusMsg: "Error to get restore exposer",
isGetExposeErr: true,
},
{
name: "Error to start restore expose",
dd: dataDownloadBuilder().Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectedStatusMsg: "Error to expose restore exposer",
isExposeErr: true,
},
{
name: "prepare timeout",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(),
},
{
name: "peek error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
isPeekExposeErr: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "dataDownload with enabled cancel",
dd: func() *velerov2alpha1api.DataDownload {
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result()
controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer)
dd.DeletionTimestamp = &metav1.Time{Time: time.Now()}
return dd
}(),
checkFunc: func(du velerov2alpha1api.DataDownload) bool {
return du.Spec.Cancel
},
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
},
{
name: "dataDownload with remove finalizer and should not be retrieved",
dd: func() *velerov2alpha1api.DataDownload {
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Cancel(true).Result()
controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer)
dd.DeletionTimestamp = &metav1.Time{Time: time.Now()}
return dd
}(),
checkFunc: func(dd velerov2alpha1api.DataDownload) bool {
return !controllerutil.ContainsFinalizer(&dd, DataUploadDownloadFinalizer)
},
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
objs := []runtime.Object{daemonSet, node} objs := []runtime.Object{daemonSet, node}
if test.targetPVC != nil { if test.targetPVC != nil {
objs = append(objs, test.targetPVC) objs = append(objs, test.targetPVC)
} }
r, err := initDataDownloadReconciler(objs, test.needErrs...) r, err := initDataDownloadReconciler(objs, test.needErrs...)
require.NoError(t, err) require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{})
if test.targetPVC != nil {
r.client.Delete(ctx, test.targetPVC, &kbclient.DeleteOptions{})
}
}()
ctx := context.Background() if !test.notCreateDD {
if test.dd.Namespace == velerov1api.DefaultNamespace { err = r.client.Create(context.Background(), test.dd)
err = r.client.Create(ctx, test.dd) require.NoError(t, err)
}
if test.needDelete {
err = r.client.Delete(context.Background(), test.dd)
require.NoError(t, err) require.NoError(t, err)
} }
@ -422,6 +482,17 @@ func TestDataDownloadReconcile(t *testing.T) {
r.dataPathMgr = datapath.NewManager(1) r.dataPathMgr = datapath.NewManager(1)
} }
if test.sportTime != nil {
r.cancelledDataDownload[test.dd.Name] = test.sportTime.Time
}
funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload
if test.needExclusiveUpdateError != nil {
funcExclusiveUpdateDataDownload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataDownload, func(*velerov2alpha1api.DataDownload)) (bool, error) {
return false, test.needExclusiveUpdateError
}
}
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t) asyncBR := datapathmockes.NewAsyncBR(t)
@ -444,7 +515,7 @@ func TestDataDownloadReconcile(t *testing.T) {
return asyncBR return asyncBR
} }
if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose { if test.isExposeErr || test.isGetExposeErr || test.isGetExposeNil || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose {
if test.isNilExposer { if test.isNilExposer {
r.restoreExposer = nil r.restoreExposer = nil
} else { } else {
@ -458,6 +529,8 @@ func TestDataDownloadReconcile(t *testing.T) {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil) ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil)
} else if test.isGetExposeErr { } else if test.isGetExposeErr {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer")) ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer"))
} else if test.isGetExposeNil {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
} else if test.isPeekExposeErr { } else if test.isPeekExposeErr {
ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error")) ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error"))
} }
@ -485,48 +558,47 @@ func TestDataDownloadReconcile(t *testing.T) {
}, },
}) })
if test.expectedStatusMsg != "" { if test.expectedErr != "" {
require.ErrorContains(t, err, test.expectedStatusMsg) assert.EqualError(t, err, test.expectedErr)
} else { } else {
require.NoError(t, err) assert.NoError(t, err)
} }
require.NotNil(t, actualResult)
if test.expectedResult != nil { if test.expectedResult != nil {
assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue) assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue)
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
} }
dd := velerov2alpha1api.DataDownload{} if test.expected != nil || test.expectDeleted {
err = r.client.Get(ctx, kbclient.ObjectKey{ dd := velerov2alpha1api.DataDownload{}
Name: test.dd.Name, err = r.client.Get(ctx, kbclient.ObjectKey{
Namespace: test.dd.Namespace, Name: test.dd.Name,
}, &dd) Namespace: test.dd.Namespace,
}, &dd)
if test.expected != nil { if test.expectDeleted {
require.NoError(t, err) assert.True(t, apierrors.IsNotFound(err))
assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase)
}
if test.isGetExposeErr {
assert.Contains(t, dd.Status.Message, test.expectedStatusMsg)
}
if test.dd.Namespace == velerov1api.DefaultNamespace {
if controllerutil.ContainsFinalizer(test.dd, DataUploadDownloadFinalizer) {
assert.True(t, true, apierrors.IsNotFound(err)) //nolint:testifylint //FIXME
} else { } else {
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase)
assert.Contains(t, dd.Status.Message, test.expected.Status.Message)
assert.Equal(t, dd.Finalizers, test.expected.Finalizers)
assert.Equal(t, dd.Spec.Cancel, test.expected.Spec.Cancel)
} }
} else {
assert.True(t, true, apierrors.IsNotFound(err)) //nolint:testifylint //FIXME
} }
if !test.needCreateFSBR { if !test.expectDataPath {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name))
} else {
assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name))
} }
t.Logf("%s: \n %v \n", test.name, dd) if test.expectCancelRecord {
assert.Contains(t, r.cancelledDataDownload, test.dd.Name)
} else {
assert.Empty(t, r.cancelledDataDownload)
}
}) })
} }
} }
@ -661,7 +733,7 @@ func TestOnDataDownloadProgress(t *testing.T) {
{ {
name: "failed to patch datadownload", name: "failed to patch datadownload",
dd: dataDownloadBuilder().Result(), dd: dataDownloadBuilder().Result(),
needErrs: []bool{false, false, false, true}, needErrs: []bool{false, false, true, false},
}, },
} }
for _, test := range tests { for _, test := range tests {
@ -883,7 +955,7 @@ func TestTryCancelDataDownload(t *testing.T) {
err = r.client.Create(ctx, test.dd) err = r.client.Create(ctx, test.dd)
require.NoError(t, err) require.NoError(t, err)
r.tryCancelAcceptedDataDownload(ctx, test.dd, "") r.tryCancelDataDownload(ctx, test.dd, "")
if test.expectedErr == "" { if test.expectedErr == "" {
assert.NoError(t, err) assert.NoError(t, err)
@ -1010,33 +1082,12 @@ func TestAttemptDataDownloadResume(t *testing.T) {
expectedError string expectedError string
}{ }{
{ {
name: "accepted DataDownload with no dd label", name: "Other DataDownload",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
cancelledDataDownloads: []string{dataDownloadName},
acceptedDataDownloads: []string{dataDownloadName},
}, },
{ {
name: "accepted DataDownload in the current node", name: "Other DataDownload",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).AcceptedByNode("node-1").Result(), dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
cancelledDataDownloads: []string{dataDownloadName},
acceptedDataDownloads: []string{dataDownloadName},
},
{
name: "accepted DataDownload with dd label but is canceled",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Cancel(true).AcceptedByNode("node-1").Result(),
acceptedDataDownloads: []string{dataDownloadName},
cancelledDataDownloads: []string{dataDownloadName},
},
{
name: "accepted DataDownload with dd label but cancel fail",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).AcceptedByNode("node-1").Result(),
needErrs: []bool{false, false, true, false, false, false},
acceptedDataDownloads: []string{dataDownloadName},
},
{
name: "prepared DataDownload",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
}, },
{ {
name: "InProgress DataDownload, not the current node", name: "InProgress DataDownload, not the current node",

View File

@ -59,6 +59,8 @@ const (
dataUploadDownloadRequestor = "snapshot-data-upload-download" dataUploadDownloadRequestor = "snapshot-data-upload-download"
DataUploadDownloadFinalizer = "velero.io/data-upload-download-finalizer" DataUploadDownloadFinalizer = "velero.io/data-upload-download-finalizer"
preparingMonitorFrequency = time.Minute preparingMonitorFrequency = time.Minute
cancelDelayInProgress = time.Hour
cancelDelayOthers = time.Minute * 5
) )
// DataUploadReconciler reconciles a DataUpload object // DataUploadReconciler reconciles a DataUpload object
@ -77,6 +79,7 @@ type DataUploadReconciler struct {
podResources corev1api.ResourceRequirements podResources corev1api.ResourceRequirements
preparingTimeout time.Duration preparingTimeout time.Duration
metrics *metrics.ServerMetrics metrics *metrics.ServerMetrics
cancelledDataUpload map[string]time.Time
} }
func NewDataUploadReconciler( func NewDataUploadReconciler(
@ -109,12 +112,13 @@ func NewDataUploadReconciler(
log, log,
), ),
}, },
dataPathMgr: dataPathMgr, dataPathMgr: dataPathMgr,
loadAffinity: loadAffinity, loadAffinity: loadAffinity,
backupPVCConfig: backupPVCConfig, backupPVCConfig: backupPVCConfig,
podResources: podResources, podResources: podResources,
preparingTimeout: preparingTimeout, preparingTimeout: preparingTimeout,
metrics: metrics, metrics: metrics,
cancelledDataUpload: make(map[string]time.Time),
} }
} }
@ -144,52 +148,104 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
// Logic for clear resources when dataupload been deleted
if !isDataUploadInFinalState(du) {
if !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
if controllerutil.ContainsFinalizer(dataUpload, DataUploadDownloadFinalizer) {
return false
}
controllerutil.AddFinalizer(dataUpload, DataUploadDownloadFinalizer)
return true
}); err != nil {
log.WithError(err).Errorf("failed to add finalizer for du %s/%s", du.Namespace, du.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if !du.DeletionTimestamp.IsZero() {
if !du.Spec.Cancel {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase)
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = "Cancel dataupload because it is being deleted"
return true
}); err != nil {
log.WithError(err).Errorf("failed to set cancel flag for du %s/%s", du.Namespace, du.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
} else {
delete(r.cancelledDataUpload, du.Name)
// put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state.
// remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
// also in final status cr won't block the direct delete of the velero namespace
if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(du *velerov2alpha1api.DataUpload) bool {
if !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
return false
}
controllerutil.RemoveFinalizer(du, DataUploadDownloadFinalizer)
return true
}); err != nil {
log.WithError(err).Error("error to remove finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
if du.Spec.Cancel {
if spotted, found := r.cancelledDataUpload[du.Name]; !found {
r.cancelledDataUpload[du.Name] = r.Clock.Now()
} else {
delay := cancelDelayOthers
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
delay = cancelDelayInProgress
}
if time.Since(spotted) > delay {
log.Infof("Data upload %s is canceled in Phase %s but not handled in reasonable time", du.GetName(), du.Status.Phase)
if r.tryCancelDataUpload(ctx, du, "") {
delete(r.cancelledDataUpload, du.Name)
}
return ctrl.Result{}, nil
}
}
}
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
if !ok { if !ok {
return r.errorOut(ctx, du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log) return r.errorOut(ctx, du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log)
} }
// Logic for clear resources when dataupload been deleted
if du.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning
if !isDataUploadInFinalState(du) && !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) {
controllerutil.AddFinalizer(du, DataUploadDownloadFinalizer)
})
if err != nil {
log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), du.Namespace, du.Name)
return ctrl.Result{}, err
} else if !succeeded {
log.Warnf("failed to add finalizer for %s/%s and will requeue later", du.Namespace, du.Name)
return ctrl.Result{Requeue: true}, nil
}
}
} else if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) && !du.Spec.Cancel && !isDataUploadInFinalState(du) {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase)
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = "Cancel dataupload because it is being deleted"
return true
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), du.Namespace, du.Name)
return ctrl.Result{}, err
}
}
if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew { if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew {
log.Info("Data upload starting") log.Info("Data upload starting")
accepted, err := r.acceptDataUpload(ctx, du) accepted, err := r.acceptDataUpload(ctx, du)
if err != nil { if err != nil {
return r.errorOut(ctx, du, err, "error to accept the data upload", log) return ctrl.Result{}, errors.Wrapf(err, "error accepting the data upload %s", du.Name)
} }
if !accepted { if !accepted {
@ -213,48 +269,15 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// but the pod maybe is not in the same node of the current controller, so we need to return it here. // but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work. // And then only the controller who is in the same node could do the rest work.
if err := ep.Expose(ctx, getOwnerObject(du), exposeParam); err != nil { if err := ep.Expose(ctx, getOwnerObject(du), exposeParam); err != nil {
if err := r.client.Get(ctx, req.NamespacedName, du); err != nil { return r.errorOut(ctx, du, err, "error exposing snapshot", log)
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
}
}
if isDataUploadInFinalState(du) {
log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err)
r.cleanUp(ctx, du, log)
return ctrl.Result{}, nil
} else {
return r.errorOut(ctx, du, err, "error to expose snapshot", log)
}
} }
log.Info("Snapshot is exposed") log.Info("Snapshot is exposed")
// we need to get CR again for it may canceled by dataupload controller on other
// nodes when doing expose action, if detectd cancel action we need to clear up the internal
// resources created by velero during backup.
if err := r.client.Get(ctx, req.NamespacedName, du); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find DataUpload")
return ctrl.Result{}, nil
}
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
}
// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataUploadInFinalState(du) {
r.cleanUp(ctx, du, log)
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
if du.Spec.Cancel { if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil {
// we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action r.tryCancelDataUpload(ctx, du, fmt.Sprintf("found a du %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr))
// we could retry when the CR requeue in periodcally
log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase)
r.tryCancelAcceptedDataUpload(ctx, du, "")
} else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil {
r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr))
log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr)
} else if du.Status.AcceptedTimestamp != nil { } else if du.Status.AcceptedTimestamp != nil {
if time.Since(du.Status.AcceptedTimestamp.Time) >= r.preparingTimeout { if time.Since(du.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
@ -264,9 +287,14 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { } else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
log.Info("Data upload is prepared") log.Infof("Data upload is prepared and should be processed by %s (%s)", du.Status.Node, r.nodeName)
if du.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
if du.Spec.Cancel { if du.Spec.Cancel {
log.Info("Prepared data upload is being canceled")
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
@ -281,8 +309,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil { if err != nil {
return r.errorOut(ctx, du, err, "exposed snapshot is not ready", log) return r.errorOut(ctx, du, err, "exposed snapshot is not ready", log)
} else if res == nil { } else if res == nil {
log.Debug("Get empty exposer") return r.errorOut(ctx, du, errors.New("no expose result is available for the current node"), "exposed snapshot is not ready", log)
return ctrl.Result{}, nil
} }
if res.ByPod.NodeOS == nil { if res.ByPod.NodeOS == nil {
@ -318,17 +345,31 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
} }
// Update status to InProgress // Update status to InProgress
original := du.DeepCopy() terminated := false
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if isDataUploadInFinalState(du) {
du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS) terminated = true
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { return false
}
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS)
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name)
r.closeDataPath(ctx, du.Name) r.closeDataPath(ctx, du.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} }
if terminated {
log.Warnf("dataupload %s is terminated during transition from prepared", du.Name)
r.closeDataPath(ctx, du.Name)
return ctrl.Result{}, nil
}
log.Info("Data upload is marked as in progress") log.Info("Data upload is marked as in progress")
if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil { if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil {
@ -340,45 +381,39 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
log.Info("Data upload is in progress")
if du.Spec.Cancel { if du.Spec.Cancel {
log.Info("Data upload is being canceled") if du.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
log.Info("In progress data upload is being canceled")
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name) asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
if asyncBR == nil { if asyncBR == nil {
if du.Status.Node == r.nodeName { r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
} else {
log.Info("Data path is not started in this node and will not canceled by current node")
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
// Update status to Canceling // Update status to Canceling
original := du.DeepCopy() if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling if isDataUploadInFinalState(du) {
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { log.Warnf("dataupload %s is terminated, abort setting it to canceling", du.Name)
return false
}
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling
return true
}); err != nil {
log.WithError(err).Error("error updating data upload into canceling status") log.WithError(err).Error("error updating data upload into canceling status")
return ctrl.Result{}, err return ctrl.Result{}, err
} }
asyncBR.Cancel() asyncBR.Cancel()
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
return ctrl.Result{}, nil
} else {
// put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state.
// remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
// also in final status cr won't block the direct delete of the velero namespace
if isDataUploadInFinalState(du) && controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
original := du.DeepCopy()
controllerutil.RemoveFinalizer(du, DataUploadDownloadFinalizer)
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error to remove finalizer")
}
}
return ctrl.Result{}, nil
} }
return ctrl.Result{}, nil
} }
func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
@ -433,16 +468,21 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
} }
// Update status to Completed with path & snapshot ID. // Update status to Completed with path & snapshot ID.
original := du.DeepCopy() if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
du.Status.Path = result.Backup.Source.ByPath if isDataUploadInFinalState(du) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted return false
du.Status.SnapshotID = result.Backup.SnapshotID }
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if result.Backup.EmptySnapshot {
du.Status.Message = "volume was empty so no data was upload"
}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { du.Status.Path = result.Backup.Source.ByPath
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
du.Status.SnapshotID = result.Backup.SnapshotID
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if result.Backup.EmptySnapshot {
du.Status.Message = "volume was empty so no data was upload"
}
return true
}); err != nil {
log.WithError(err).Error("error updating DataUpload status") log.WithError(err).Error("error updating DataUpload status")
} else { } else {
log.Info("Data upload completed") log.Info("Data upload completed")
@ -475,27 +515,34 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
du := &velerov2alpha1api.DataUpload{} du := &velerov2alpha1api.DataUpload{}
if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, du); getErr != nil { if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, du); getErr != nil {
log.WithError(getErr).Warn("Failed to get dataupload on cancel") log.WithError(getErr).Warn("Failed to get dataupload on cancel")
} else { return
// cleans up any objects generated during the snapshot expose }
r.cleanUp(ctx, du, log) // cleans up any objects generated during the snapshot expose
original := du.DeepCopy() r.cleanUp(ctx, du, log)
if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
if isDataUploadInFinalState(du) {
return false
}
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
if du.Status.StartTimestamp.IsZero() { if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
} }
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status") return true
} else { }); err != nil {
r.metrics.RegisterDataUploadCancel(r.nodeName) log.WithError(err).Error("error updating DataUpload status")
} } else {
r.metrics.RegisterDataUploadCancel(r.nodeName)
delete(r.cancelledDataUpload, du.Name)
} }
} }
func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { func (r *DataUploadReconciler) tryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) bool {
log := r.logger.WithField("dataupload", du.Name) log := r.logger.WithField("dataupload", du.Name)
log.Warn("Accepted data upload is canceled") succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(dataUpload *velerov2alpha1api.DataUpload) {
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
if dataUpload.Status.StartTimestamp.IsZero() { if dataUpload.Status.StartTimestamp.IsZero() {
dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
@ -509,16 +556,20 @@ func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context,
if err != nil { if err != nil {
log.WithError(err).Error("error updating dataupload status") log.WithError(err).Error("error updating dataupload status")
return return false
} else if !succeeded { } else if !succeeded {
log.Warn("conflict in updating dataupload status and will try it again later") log.Warn("conflict in updating dataupload status and will try it again later")
return return false
} }
// success update // success update
r.metrics.RegisterDataUploadCancel(r.nodeName) r.metrics.RegisterDataUploadCancel(r.nodeName)
// cleans up any objects generated during the snapshot expose // cleans up any objects generated during the snapshot expose
r.cleanUp(ctx, du, log) r.cleanUp(ctx, du, log)
log.Warn("data upload is canceled")
return true
} }
func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) { func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) {
@ -538,16 +589,10 @@ func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1ap
func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) { func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) {
log := r.logger.WithField("dataupload", duName) log := r.logger.WithField("dataupload", duName)
var du velerov2alpha1api.DataUpload if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: duName}, log, func(du *velerov2alpha1api.DataUpload) bool {
if err := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); err != nil { du.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
log.WithError(err).Warn("Failed to get dataupload on progress") return true
return }); err != nil {
}
original := du.DeepCopy()
du.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Failed to update progress") log.WithError(err).Error("Failed to update progress")
} }
} }
@ -560,7 +605,19 @@ func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespa
func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
gp := kube.NewGenericEventPredicate(func(object client.Object) bool { gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
du := object.(*velerov2alpha1api.DataUpload) du := object.(*velerov2alpha1api.DataUpload)
return (du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted) if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
return true
}
if du.Spec.Cancel && !isDataUploadInFinalState(du) {
return true
}
if isDataUploadInFinalState(du) && !du.DeletionTimestamp.IsZero() {
return true
}
return false
}) })
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataUpload), r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{ s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataUpload), r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{
Predicates: []predicate.Predicate{gp}, Predicates: []predicate.Predicate{gp},
@ -621,10 +678,17 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
if pod.Status.Phase == corev1api.PodRunning { if pod.Status.Phase == corev1api.PodRunning {
log.Info("Preparing dataupload") log.Info("Preparing dataupload")
// we don't expect anyone else update the CR during the Prepare process if err = UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log,
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload) func(du *velerov2alpha1api.DataUpload) bool {
if err != nil || !updated { if isDataUploadInFinalState(du) {
log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload") log.Warnf("dataupload %s is terminated, abort setting it to prepared", du.Name)
return false
}
r.prepareDataUpload(du)
return true
}); err != nil {
log.WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload")
return []reconcile.Request{} return []reconcile.Request{}
} }
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
@ -678,18 +742,26 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a
} }
func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) error { func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) error {
original := du.DeepCopy() log.Info("update data upload status to Failed")
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = errors.WithMessage(err, msg).Error()
if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
if dataPathError, ok := err.(datapath.DataPathError); ok { if patchErr := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
du.Status.SnapshotID = dataPathError.GetSnapshotID() if isDataUploadInFinalState(du) {
} return false
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} }
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = errors.WithMessage(err, msg).Error()
if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
if dataPathError, ok := err.(datapath.DataPathError); ok {
du.Status.SnapshotID = dataPathError.GetSnapshotID()
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
return true
}); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status") log.WithError(patchErr).Error("error updating DataUpload status")
} else { } else {
r.metrics.RegisterDataUploadFailure(r.nodeName) r.metrics.RegisterDataUploadFailure(r.nodeName)
@ -711,7 +783,7 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov
dataUpload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()} dataUpload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}
} }
succeeded, err := r.exclusiveUpdateDataUpload(ctx, updated, updateFunc) succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, updated, updateFunc)
if err != nil { if err != nil {
return false, err return false, err
@ -732,7 +804,7 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov
log.Info("Timeout happened for preparing dataupload") log.Info("Timeout happened for preparing dataupload")
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = "timeout on preparing data upload" du.Status.Message = "timeout on preparing data upload"
}) })
@ -770,11 +842,13 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov
r.metrics.RegisterDataUploadFailure(r.nodeName) r.metrics.RegisterDataUploadFailure(r.nodeName)
} }
func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, var funcExclusiveUpdateDataUpload = exclusiveUpdateDataUpload
func exclusiveUpdateDataUpload(ctx context.Context, cli client.Client, du *velerov2alpha1api.DataUpload,
updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) { updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) {
updateFunc(du) updateFunc(du)
err := r.client.Update(ctx, du) err := cli.Update(ctx, du)
if err == nil { if err == nil {
return true, nil return true, nil
} }
@ -905,7 +979,7 @@ func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool {
du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted
} }
func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataUpload) bool) error { func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov2alpha1api.DataUpload) bool) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
du := &velerov2alpha1api.DataUpload{} du := &velerov2alpha1api.DataUpload{}
if err := client.Get(ctx, namespacedName, du); err != nil { if err := client.Get(ctx, namespacedName, du); err != nil {
@ -939,11 +1013,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logg
for i := range dataUploads.Items { for i := range dataUploads.Items {
du := &dataUploads.Items[i] du := &dataUploads.Items[i]
if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by dataupload controller after node-agent restart
logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared")
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
if du.Status.Node != r.nodeName { if du.Status.Node != r.nodeName {
logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node) logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node)
continue continue
@ -972,23 +1042,10 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logg
if err != nil { if err != nil {
logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
} }
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { } else {
r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") // the Prepared CR could be still handled by dataupload controller after node-agent restart
// the accepted CR may also suvived from node-agent restart as long as the intermediate objects are all done
err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), logger.WithField("dataupload", du.GetName()).Infof("find a dataupload with status %s", du.Status.Phase)
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = "Dataupload is in Accepted status during the node-agent starting, mark it as cancel"
return true
})
if err != nil {
r.logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
}
} }
} }

View File

@ -45,7 +45,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client" kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -270,35 +269,30 @@ type fakeSnapshotExposer struct {
clock clock.WithTickerAndDelayedExecution clock clock.WithTickerAndDelayedExecution
ambiguousNodeOS bool ambiguousNodeOS bool
peekErr error peekErr error
exposeErr error
getErr error
getNil bool
} }
func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param any) error { func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param any) error {
du := velerov2alpha1api.DataUpload{} if f.exposeErr != nil {
err := f.kubeClient.Get(ctx, kbclient.ObjectKey{ return f.exposeErr
Name: dataUploadName,
Namespace: velerov1api.DefaultNamespace,
}, &du)
if err != nil {
return err
} }
original := du
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
du.Status.StartTimestamp = &metav1.Time{Time: f.clock.Now()}
f.kubeClient.Patch(ctx, &du, kbclient.MergeFrom(&original))
return nil return nil
} }
func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1api.ObjectReference, tm time.Duration, para any) (*exposer.ExposeResult, error) { func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1api.ObjectReference, tm time.Duration, para any) (*exposer.ExposeResult, error) {
pod := &corev1api.Pod{} if f.getErr != nil {
err := f.kubeClient.Get(ctx, kbclient.ObjectKey{ return nil, f.getErr
Name: dataUploadName,
Namespace: velerov1api.DefaultNamespace,
}, pod)
if err != nil {
return nil, err
} }
if f.getNil {
return nil, nil
}
pod := &corev1api.Pod{}
nodeOS := "linux" nodeOS := "linux"
pNodeOS := &nodeOS pNodeOS := &nodeOS
if f.ambiguousNodeOS { if f.ambiguousNodeOS {
@ -346,214 +340,265 @@ func (b *fakeDataUploadFSBR) Close(ctx context.Context) {
func TestReconcile(t *testing.T) { func TestReconcile(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
du *velerov2alpha1api.DataUpload du *velerov2alpha1api.DataUpload
pod *corev1api.Pod notCreateDU bool
pvc *corev1api.PersistentVolumeClaim needDelete bool
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer sportTime *metav1.Time
dataMgr *datapath.Manager pod *corev1api.Pod
expectedProcessed bool pvc *corev1api.PersistentVolumeClaim
expected *velerov2alpha1api.DataUpload snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
checkFunc func(velerov2alpha1api.DataUpload) bool dataMgr *datapath.Manager
expectedRequeue ctrl.Result needCreateFSBR bool
expectedErrMsg string needExclusiveUpdateError error
needErrs []bool expected *velerov2alpha1api.DataUpload
removeNode bool expectDeleted bool
ambiguousNodeOS bool expectCancelRecord bool
peekErr error needErrs []bool
notCreateFSBR bool ambiguousNodeOS bool
fsBRInitErr error peekErr error
fsBRStartErr error exposeErr error
getExposeErr error
getExposeNil bool
fsBRInitErr error
fsBRStartErr error
expectedErr string
expectedResult *ctrl.Result
expectDataPath bool
}{ }{
{ {
name: "Dataupload is not initialized", name: "du not found",
du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), du: dataUploadBuilder().Result(),
expectedRequeue: ctrl.Result{}, notCreateDU: true,
}, },
{ {
name: "Error get Dataupload", name: "du not created in velero default namespace",
du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), du: builder.ForDataUpload("test-ns", dataUploadName).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "getting DataUpload: Get error",
needErrs: []bool{true, false, false, false},
}, },
{ {
name: "Unsupported data mover type", name: "get du fail",
du: dataUploadBuilder().DataMover("unknown type").Result(), du: dataUploadBuilder().Result(),
expected: dataUploadBuilder().Phase("").Result(), needErrs: []bool{true, false, false, false},
expectedRequeue: ctrl.Result{}, expectedErr: "getting DataUpload: Get error",
}, },
{ {
name: "Unknown type of snapshot exposer is not initialized", name: "du is not for built-in dm",
du: dataUploadBuilder().SnapshotType("unknown type").Result(), du: dataUploadBuilder().DataMover("other").Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "unknown type type of snapshot exposer is not exist",
}, },
{ {
name: "Dataupload should be accepted", name: "add finalizer to du",
du: dataUploadBuilder().Result(), du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1api.Volume{Name: "test-pvc"}).Result(), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
}, },
{ {
name: "Dataupload should fail to get PVC information", name: "add finalizer to du failed",
du: dataUploadBuilder().Result(), du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1api.Volume{Name: "wrong-pvc"}).Result(), needErrs: []bool{false, false, true, false},
expectedProcessed: true, expectedErr: "error updating dataupload with error velero/dataupload-1: Update error",
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "failed to get PVC",
}, },
{ {
name: "Dataupload should fail because expected node doesn't exist", name: "du is under deletion",
du: dataUploadBuilder().Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1api.Volume{Name: "test-pvc"}).Result(), needDelete: true,
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
removeNode: true,
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "no appropriate node to run data upload",
}, },
{ {
name: "Dataupload should be prepared", name: "du is under deletion but cancel failed",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), needErrs: []bool{false, false, true, false},
expectedRequeue: ctrl.Result{}, needDelete: true,
expectedErr: "error updating dataupload with error velero/dataupload-1: Update error",
}, },
{ {
name: "Dataupload prepared should be completed", name: "du is under deletion and in terminal state",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), sportTime: &metav1.Time{Time: time.Now()},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), needDelete: true,
expectedRequeue: ctrl.Result{}, expectDeleted: true,
}, },
{ {
name: "Dataupload should fail if expose returns ambiguous nodeOS", name: "du is under deletion and in terminal state, but remove finalizer failed",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), needErrs: []bool{false, false, true, false},
ambiguousNodeOS: true, needDelete: true,
expectedProcessed: true, expectedErr: "error updating dataupload with error velero/dataupload-1: Update error",
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedErrMsg: "unsupported ambiguous node OS",
}, },
{ {
name: "Dataupload with not enabled cancel", name: "delay cancel negative for others",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), sportTime: &metav1.Time{Time: time.Now()},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), expectCancelRecord: true,
expectedRequeue: ctrl.Result{},
}, },
{ {
name: "Dataupload should be cancel", name: "delay cancel negative for inProgress",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), expectCancelRecord: true,
expectedRequeue: ctrl.Result{},
}, },
{ {
name: "Dataupload should be cancel with match node", name: "delay cancel affirmative for others",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
du: func() *velerov2alpha1api.DataUpload { sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)},
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result() expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
du.Status.Node = "test-node"
return du
}(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
expectedRequeue: ctrl.Result{},
notCreateFSBR: true,
}, },
{ {
name: "Dataupload should not be cancel with mismatch node", name: "delay cancel affirmative for inProgress",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
du: func() *velerov2alpha1api.DataUpload { sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result() expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
du.Status.Node = "different_node"
return du
}(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
notCreateFSBR: true,
}, },
{ {
name: "runCancelableDataUpload is concurrent limited", name: "delay cancel failed",
dataMgr: datapath.NewManager(0), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), needErrs: []bool{false, false, true, false},
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, expectCancelRecord: true,
}, },
{ {
name: "data path init error", name: "Unknown data upload status",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
fsBRInitErr: errors.New("fake-data-path-init-error"),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(),
expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error",
}, },
{ {
name: "Unable to update status to in progress for data download", name: "Unknown type of snapshot exposer is not initialized",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).SnapshotType("unknown type").Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
needErrs: []bool{false, false, false, true}, expectedErr: "unknown type type of snapshot exposer is not exist",
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
}, },
{ {
name: "data path start error", name: "new du but accept failed",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), needExclusiveUpdateError: errors.New("exclusive-update-error"),
fsBRStartErr: errors.New("fake-data-path-start-error"), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedProcessed: true, expectedErr: "error accepting the data upload dataupload-1: exclusive-update-error",
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(),
expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error",
}, },
{ {
name: "prepare timeout", name: "du is cancel on accepted",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
}, },
{ {
name: "peek error", name: "du is accepted but setup expose param failed on getting PVC",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
peekErr: errors.New("fake-peek-error"), expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Message("failed to set exposer parameters").Result(),
expectedProcessed: true, expectedErr: "failed to get PVC fake-ns/test-pvc: persistentvolumeclaims \"test-pvc\" not found",
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
}, },
{ {
name: "Dataupload with enabled cancel", name: "du expose failed",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).SnapshotType(fakeSnapshotType).Result(),
du: func() *velerov2alpha1api.DataUpload { pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result() exposeErr: errors.New("fake-expose-error"),
controllerutil.AddFinalizer(du, DataUploadDownloadFinalizer) expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Message("error exposing snapshot").Result(),
du.DeletionTimestamp = &metav1.Time{Time: time.Now()} expectedErr: "fake-expose-error",
return du
}(),
checkFunc: func(du velerov2alpha1api.DataUpload) bool {
return du.Spec.Cancel
},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
}, },
{ {
name: "Dataupload with remove finalizer and should not be retrieved", name: "du succeeds for accepted",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1api.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).SnapshotType(fakeSnapshotType).Result(),
du: func() *velerov2alpha1api.DataUpload { pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Cancel(true).Result() expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
controllerutil.AddFinalizer(du, DataUploadDownloadFinalizer) },
du.DeletionTimestamp = &metav1.Time{Time: time.Now()} {
return du name: "prepare timeout on accepted",
}(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Finalizers([]string{DataUploadDownloadFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(),
checkFunc: func(du velerov2alpha1api.DataUpload) bool { expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Message("timeout on preparing data upload").Result(),
return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer) },
}, {
expectedRequeue: ctrl.Result{}, name: "peek error on accepted",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
peekErr: errors.New("fake-peak-error"),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Message("found a du velero/dataupload-1 with expose error: fake-peak-error. mark it as cancel").Result(),
},
{
name: "cancel on prepared",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Cancel(true).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
},
{
name: "Failed to get snapshot expose on prepared",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
getExposeErr: errors.New("fake-get-error"),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("exposed snapshot is not ready: fake-get-error").Result(),
expectedErr: "fake-get-error",
},
{
name: "Get nil restore expose on prepared",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
getExposeNil: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("exposed snapshot is not ready").Result(),
expectedErr: "no expose result is available for the current node",
},
{
name: "Dataupload should fail if expose returns ambiguous nodeOS",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
ambiguousNodeOS: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "unsupported ambiguous node OS",
},
{
name: "Error in data path is concurrent limited",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
dataMgr: datapath.NewManager(0),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path init error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
fsBRInitErr: errors.New("fake-data-path-init-error"),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error initializing data path").Result(),
expectedErr: "error initializing asyncBR: fake-data-path-init-error",
},
{
name: "Unable to update status to in progress for data upload",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
needErrs: []bool{false, false, true, false},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path start error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
fsBRStartErr: errors.New("fake-data-path-start-error"),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Finalizers([]string{DataUploadDownloadFinalizer}).Message("error starting data path").Result(),
expectedErr: "error starting async backup for pod , volume dataupload-1: fake-data-path-start-error",
},
{
name: "Prepare succeeds",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectDataPath: true,
},
{
name: "In progress du is not handled by the current node",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "In progress du is not set as cancel",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "Cancel data upload in progress with empty FSBR",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "Cancel data upload in progress and patch data upload error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
needErrs: []bool{false, false, true, false},
needCreateFSBR: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "error updating dataupload with error velero/dataupload-1: Update error",
expectCancelRecord: true,
expectDataPath: true,
},
{
name: "Cancel data upload in progress succeeds",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Node("test-node").Result(),
needCreateFSBR: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Cancel(true).Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectDataPath: true,
expectCancelRecord: true,
}, },
} }
@ -561,24 +606,15 @@ func TestReconcile(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
r, err := initDataUploaderReconciler(test.needErrs...) r, err := initDataUploaderReconciler(test.needErrs...)
require.NoError(t, err) require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) if !test.notCreateDU {
if test.pod != nil { err = r.client.Create(context.Background(), test.du)
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) require.NoError(t, err)
} }
}()
ctx := context.Background() if test.needDelete {
if test.du.Namespace == velerov1api.DefaultNamespace { err = r.client.Delete(context.Background(), test.du)
isDeletionTimestampSet := test.du.DeletionTimestamp != nil
err = r.client.Create(ctx, test.du)
require.NoError(t, err) require.NoError(t, err)
// because of the changes introduced by https://github.com/kubernetes-sigs/controller-runtime/commit/7a66d580c0c53504f5b509b45e9300cc18a1cc30
// the fake client ignores the DeletionTimestamp when calling the Create(),
// so call Delete() here
if isDeletionTimestampSet {
err = r.client.Delete(ctx, test.du)
require.NoError(t, err)
}
} }
if test.pod != nil { if test.pod != nil {
@ -591,38 +627,41 @@ func TestReconcile(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
if test.removeNode {
err = r.kubeClient.CoreV1().Nodes().Delete(ctx, "fake-node", metav1.DeleteOptions{})
require.NoError(t, err)
}
if test.dataMgr != nil { if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr r.dataPathMgr = test.dataMgr
} else { } else {
r.dataPathMgr = datapath.NewManager(1) r.dataPathMgr = datapath.NewManager(1)
} }
if test.sportTime != nil {
r.cancelledDataUpload[test.du.Name] = test.sportTime.Time
}
if test.du.Spec.SnapshotType == fakeSnapshotType { if test.du.Spec.SnapshotType == fakeSnapshotType {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr}} r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr, test.exposeErr, test.getExposeErr, test.getExposeNil}}
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())}
} }
if !test.notCreateFSBR {
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { funcExclusiveUpdateDataUpload = exclusiveUpdateDataUpload
return &fakeDataUploadFSBR{ if test.needExclusiveUpdateError != nil {
du: test.du, funcExclusiveUpdateDataUpload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataUpload, func(*velerov2alpha1api.DataUpload)) (bool, error) {
kubeClient: r.client, return false, test.needExclusiveUpdateError
clock: r.Clock,
initErr: test.fsBRInitErr,
startErr: test.fsBRStartErr,
}
} }
} }
testCreateFsBR := false datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { return &fakeDataUploadFSBR{
du: test.du,
kubeClient: r.client,
clock: r.Clock,
initErr: test.fsBRInitErr,
startErr: test.fsBRStartErr,
}
}
if test.needCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil {
testCreateFsBR = true
_, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger())
require.NoError(t, err) require.NoError(t, err)
} }
@ -635,41 +674,46 @@ func TestReconcile(t *testing.T) {
}, },
}) })
assert.Equal(t, test.expectedRequeue, actualResult) if test.expectedErr != "" {
if test.expectedErrMsg == "" { assert.EqualError(t, err, test.expectedErr)
require.NoError(t, err)
} else { } else {
require.ErrorContains(t, err, test.expectedErrMsg) assert.NoError(t, err)
} }
du := velerov2alpha1api.DataUpload{} if test.expectedResult != nil {
err = r.client.Get(ctx, kbclient.ObjectKey{ assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue)
Name: test.du.Name, assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
Namespace: test.du.Namespace,
}, &du)
t.Logf("%s: \n %v \n", test.name, du)
// Assertions
if test.expected == nil {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, test.expected.Status.Phase, du.Status.Phase)
} }
if test.expectedProcessed { if test.expected != nil || test.expectDeleted {
assert.False(t, du.Status.CompletionTimestamp.IsZero()) du := velerov2alpha1api.DataUpload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.du.Name,
Namespace: test.du.Namespace,
}, &du)
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
require.NoError(t, err)
assert.Equal(t, test.expected.Status.Phase, du.Status.Phase)
assert.Contains(t, du.Status.Message, test.expected.Status.Message)
assert.Equal(t, du.Finalizers, test.expected.Finalizers)
assert.Equal(t, du.Spec.Cancel, test.expected.Spec.Cancel)
}
} }
if !test.expectedProcessed { if !test.expectDataPath {
assert.True(t, du.Status.CompletionTimestamp.IsZero())
}
if test.checkFunc != nil {
assert.True(t, test.checkFunc(du))
}
if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name))
} else {
assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.du.Name))
}
if test.expectCancelRecord {
assert.Contains(t, r.cancelledDataUpload, test.du.Name)
} else {
assert.Empty(t, r.cancelledDataUpload)
} }
}) })
} }
@ -719,7 +763,7 @@ func TestOnDataUploadProgress(t *testing.T) {
{ {
name: "failed to patch dataupload", name: "failed to patch dataupload",
du: dataUploadBuilder().Result(), du: dataUploadBuilder().Result(),
needErrs: []bool{false, false, false, true}, needErrs: []bool{false, false, true, false},
}, },
} }
for _, test := range tests { for _, test := range tests {
@ -992,7 +1036,7 @@ func TestTryCancelDataUpload(t *testing.T) {
err = r.client.Create(ctx, test.dd) err = r.client.Create(ctx, test.dd)
require.NoError(t, err) require.NoError(t, err)
r.tryCancelAcceptedDataUpload(ctx, test.dd, "") r.tryCancelDataUpload(ctx, test.dd, "")
if test.expectedErr == "" { if test.expectedErr == "" {
assert.NoError(t, err) assert.NoError(t, err)
@ -1114,33 +1158,8 @@ func TestAttemptDataUploadResume(t *testing.T) {
expectedError string expectedError string
}{ }{
{ {
name: "accepted DataUpload in other node", name: "Other DataUpload",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
cancelledDataUploads: []string{dataUploadName},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "accepted DataUpload in the current node",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).AcceptedByNode("node-1").Result(),
cancelledDataUploads: []string{dataUploadName},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "accepted DataUpload in the current node but canceled",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).AcceptedByNode("node-1").Cancel(true).Result(),
cancelledDataUploads: []string{dataUploadName},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "accepted DataUpload in the current node but update error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).AcceptedByNode("node-1").Result(),
needErrs: []bool{false, false, true, false, false, false},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "prepared DataUpload",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
prepareddDataUploads: []string{dataUploadName},
}, },
{ {
name: "InProgress DataUpload, not the current node", name: "InProgress DataUpload, not the current node",