Mark dataupload datadownload status failed when velero pod restart
Signed-off-by: Ming Qiu <mqiu@vmware.com>pull/6461/head
parent
e54a8af0ad
commit
480fe445b1
|
@ -256,11 +256,15 @@ func (s *nodeAgentServer) run() {
|
|||
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
|
||||
}
|
||||
|
||||
if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil {
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger)
|
||||
s.markDataUploadsCancel(dataUploadReconciler)
|
||||
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
|
||||
}
|
||||
|
||||
if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil {
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger)
|
||||
s.markDataDownloadsCancel(dataDownloadReconciler)
|
||||
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data download controller")
|
||||
}
|
||||
|
||||
|
@ -333,6 +337,62 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
|
|||
s.markInProgressPVRsFailed(client)
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) {
|
||||
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
|
||||
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
|
||||
return
|
||||
}
|
||||
if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
|
||||
} else {
|
||||
for i := range dataUploads {
|
||||
du := dataUploads[i]
|
||||
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
updated := du.DeepCopy()
|
||||
updated.Spec.Cancel = true
|
||||
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
|
||||
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName())
|
||||
continue
|
||||
}
|
||||
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) {
|
||||
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
|
||||
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
|
||||
return
|
||||
}
|
||||
if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
|
||||
} else {
|
||||
for i := range dataDownloads {
|
||||
dd := dataDownloads[i]
|
||||
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
|
||||
updated := dd.DeepCopy()
|
||||
updated.Spec.Cancel = true
|
||||
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
|
||||
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
|
||||
continue
|
||||
}
|
||||
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
|
||||
pvbs := &velerov1api.PodVolumeBackupList{}
|
||||
if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
|
||||
|
|
|
@ -1065,7 +1065,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client,
|
|||
}
|
||||
|
||||
for i, backup := range backups.Items {
|
||||
if backup.Status.Phase != velerov1api.BackupPhaseInProgress {
|
||||
if backup.Status.Phase != velerov1api.BackupPhaseInProgress && backup.Status.Phase != velerov1api.BackupPhaseWaitingForPluginOperations {
|
||||
log.Debugf("the status of backup %q is %q, skip", backup.GetName(), backup.Status.Phase)
|
||||
continue
|
||||
}
|
||||
|
@ -1078,6 +1078,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client,
|
|||
continue
|
||||
}
|
||||
log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason)
|
||||
markDataUploadsCancel(ctx, client, backup, log)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1088,7 +1089,7 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client,
|
|||
return
|
||||
}
|
||||
for i, restore := range restores.Items {
|
||||
if restore.Status.Phase != velerov1api.RestorePhaseInProgress {
|
||||
if restore.Status.Phase != velerov1api.RestorePhaseInProgress && restore.Status.Phase != velerov1api.RestorePhaseWaitingForPluginOperations {
|
||||
log.Debugf("the status of restore %q is %q, skip", restore.GetName(), restore.Status.Phase)
|
||||
continue
|
||||
}
|
||||
|
@ -1101,5 +1102,56 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client,
|
|||
continue
|
||||
}
|
||||
log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason)
|
||||
markDataDownloadsCancel(ctx, client, restore, log)
|
||||
}
|
||||
}
|
||||
|
||||
func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup velerov1api.Backup, log logrus.FieldLogger) {
|
||||
dataUploads := &velerov2alpha1api.DataUploadList{}
|
||||
|
||||
if err := client.List(ctx, dataUploads, &ctrlclient.MatchingFields{"metadata.namespace": backup.GetNamespace()}, &ctrlclient.MatchingLabels{velerov1api.BackupUIDLabel: string(backup.GetUID())}); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("failed to list dataUploads")
|
||||
return
|
||||
}
|
||||
|
||||
for i := range dataUploads.Items {
|
||||
du := dataUploads.Items[i]
|
||||
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
updated := du.DeepCopy()
|
||||
updated.Spec.Cancel = true
|
||||
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
|
||||
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
|
||||
continue
|
||||
}
|
||||
log.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, restore velerov1api.Restore, log logrus.FieldLogger) {
|
||||
dataDownloads := &velerov2alpha1api.DataDownloadList{}
|
||||
|
||||
if err := client.List(ctx, dataDownloads, &ctrlclient.MatchingFields{"metadata.namespace": restore.GetNamespace()}, &ctrlclient.MatchingLabels{velerov1api.RestoreUIDLabel: string(restore.GetUID())}); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("failed to list dataDownloads")
|
||||
return
|
||||
}
|
||||
|
||||
for i := range dataDownloads.Items {
|
||||
dd := dataDownloads.Items[i]
|
||||
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
|
||||
updated := dd.DeepCopy()
|
||||
updated.Spec.Cancel = true
|
||||
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
|
||||
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
|
||||
continue
|
||||
}
|
||||
log.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ type DataDownloadReconciler struct {
|
|||
logger logrus.FieldLogger
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
fileSystem filesystem.Interface
|
||||
clock clock.WithTickerAndDelayedExecution
|
||||
Clock clock.WithTickerAndDelayedExecution
|
||||
restoreExposer exposer.GenericRestoreExposer
|
||||
nodeName string
|
||||
repositoryEnsurer *repository.Ensurer
|
||||
|
@ -72,7 +72,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
|
|||
logger: logger.WithField("controller", "DataDownload"),
|
||||
credentialGetter: credentialGetter,
|
||||
fileSystem: filesystem.NewFileSystem(),
|
||||
clock: &clock.RealClock{},
|
||||
Clock: &clock.RealClock{},
|
||||
nodeName: nodeName,
|
||||
repositoryEnsurer: repoEnsurer,
|
||||
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
|
||||
|
@ -132,9 +132,15 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
|||
|
||||
log.Info("Data download is accepted")
|
||||
|
||||
if dd.Spec.Cancel {
|
||||
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
|
||||
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
|
||||
|
||||
// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
||||
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
||||
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
|
||||
// And then only the controller who is in the same node could do the rest work.
|
||||
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
|
||||
|
@ -143,9 +149,22 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
|||
}
|
||||
log.Info("Restore is exposed")
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
|
||||
if dd.Spec.Cancel {
|
||||
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
|
||||
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
|
||||
log.Info("Data download is prepared")
|
||||
|
||||
if dd.Spec.Cancel {
|
||||
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
|
||||
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
|
||||
|
||||
if fsRestore != nil {
|
||||
|
@ -184,7 +203,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
|||
// Update status to InProgress
|
||||
original := dd.DeepCopy()
|
||||
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
|
||||
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("Unable to update status to in progress")
|
||||
return ctrl.Result{}, err
|
||||
|
@ -271,7 +290,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
|
|||
|
||||
original := dd.DeepCopy()
|
||||
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating data download status")
|
||||
} else {
|
||||
|
@ -313,9 +332,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
|
|||
original := dd.DeepCopy()
|
||||
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
|
||||
if dd.Status.StartTimestamp.IsZero() {
|
||||
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
}
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating data download status")
|
||||
}
|
||||
|
@ -382,15 +401,13 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
|
||||
pod := podObj.(*v1.Pod)
|
||||
|
||||
dd := &velerov2alpha1api.DataDownload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Labels[velerov1api.DataDownloadLabel],
|
||||
}, dd)
|
||||
|
||||
dd, err := findDataDownloadByPod(r.client, *pod)
|
||||
if err != nil {
|
||||
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
|
||||
return []reconcile.Request{}
|
||||
} else if dd == nil {
|
||||
r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
|
||||
|
@ -416,6 +433,30 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
|
|||
return requests
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
|
||||
pods := &v1.PodList{}
|
||||
var dataDownloads []*velerov2alpha1api.DataDownload
|
||||
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
|
||||
return nil, errors.Wrapf(err, "failed to list pods on current node")
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
if pod.Spec.NodeName != r.nodeName {
|
||||
r.logger.Debugf("Pod %s related data download will not handled by %s nodes", pod.GetName(), r.nodeName)
|
||||
continue
|
||||
}
|
||||
dd, err := findDataDownloadByPod(cli, pod)
|
||||
if err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataDownload by pod")
|
||||
continue
|
||||
} else if dd != nil {
|
||||
dataDownloads = append(dataDownloads, dd)
|
||||
}
|
||||
}
|
||||
return dataDownloads, nil
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error {
|
||||
original := req.DeepCopy()
|
||||
mutate(req)
|
||||
|
@ -442,7 +483,7 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
|
|||
original := dd.DeepCopy()
|
||||
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
|
||||
dd.Status.Message = errors.WithMessage(err, msg).Error()
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
|
||||
if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataDownload status")
|
||||
|
@ -491,3 +532,20 @@ func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectRef
|
|||
APIVersion: dd.APIVersion,
|
||||
}
|
||||
}
|
||||
|
||||
func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api.DataDownload, error) {
|
||||
if label, exist := pod.Labels[velerov1api.DataDownloadLabel]; exist {
|
||||
dd := &velerov2alpha1api.DataDownload{}
|
||||
err := client.Get(context.Background(), types.NamespacedName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: label,
|
||||
}, dd)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to find DataDownload by pod %s/%s", pod.Namespace, pod.Name)
|
||||
}
|
||||
return dd, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -550,6 +550,14 @@ func TestFindDataDownloadForPod(t *testing.T) {
|
|||
assert.Equal(t, du.Namespace, requests[0].Namespace)
|
||||
assert.Equal(t, du.Name, requests[0].Name)
|
||||
},
|
||||
}, {
|
||||
name: "no selected label found for pod",
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Result(),
|
||||
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
|
||||
// Assert that the function returns a single request
|
||||
assert.Empty(t, requests)
|
||||
},
|
||||
}, {
|
||||
name: "no matched pod",
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
|
@ -559,7 +567,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "dataDownload not accepte",
|
||||
name: "dataDownload not accept",
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
|
||||
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
|
||||
|
|
|
@ -61,7 +61,7 @@ type DataUploadReconciler struct {
|
|||
kubeClient kubernetes.Interface
|
||||
csiSnapshotClient snapshotter.SnapshotV1Interface
|
||||
repoEnsurer *repository.Ensurer
|
||||
clock clocks.WithTickerAndDelayedExecution
|
||||
Clock clocks.WithTickerAndDelayedExecution
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
nodeName string
|
||||
fileSystem filesystem.Interface
|
||||
|
@ -77,7 +77,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
|
|||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
csiSnapshotClient: csiSnapshotClient,
|
||||
clock: clock,
|
||||
Clock: clock,
|
||||
credentialGetter: cred,
|
||||
nodeName: nodeName,
|
||||
fileSystem: fs,
|
||||
|
@ -134,18 +134,34 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
|
||||
log.Info("Data upload is accepted")
|
||||
|
||||
if du.Spec.Cancel {
|
||||
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
exposeParam := r.setupExposeParam(&du)
|
||||
|
||||
if err := ep.Expose(ctx, getOwnerObject(&du), exposeParam); err != nil {
|
||||
return r.errorOut(ctx, &du, err, "error to expose snapshot", log)
|
||||
}
|
||||
log.Info("Snapshot is exposed")
|
||||
// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
||||
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
||||
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
|
||||
// And then only the controller who is in the same node could do the rest work.
|
||||
return ctrl.Result{}, nil
|
||||
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
|
||||
if du.Spec.Cancel {
|
||||
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
|
||||
log.Info("Data upload is prepared")
|
||||
|
||||
if du.Spec.Cancel {
|
||||
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
|
||||
if fsBackup != nil {
|
||||
log.Info("Cancellable data path is already started")
|
||||
|
@ -183,7 +199,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
// Update status to InProgress
|
||||
original := du.DeepCopy()
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
|
||||
return r.errorOut(ctx, &du, err, "error updating dataupload status", log)
|
||||
}
|
||||
|
@ -277,7 +293,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
|
|||
du.Status.Path = result.Backup.Source.ByPath
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
|
||||
du.Status.SnapshotID = result.Backup.SnapshotID
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if result.Backup.EmptySnapshot {
|
||||
du.Status.Message = "volume was empty so no data was upload"
|
||||
}
|
||||
|
@ -331,9 +347,9 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
|
|||
original := du.DeepCopy()
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
|
||||
if du.Status.StartTimestamp.IsZero() {
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
}
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating DataUpload status")
|
||||
}
|
||||
|
@ -399,16 +415,13 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
|
||||
func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request {
|
||||
pod := podObj.(*corev1.Pod)
|
||||
|
||||
du := &velerov2alpha1api.DataUpload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Labels[velerov1api.DataUploadLabel],
|
||||
}, du)
|
||||
|
||||
du, err := findDataUploadByPod(r.client, *pod)
|
||||
if err != nil {
|
||||
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload")
|
||||
return []reconcile.Request{}
|
||||
} else if du == nil {
|
||||
r.logger.WithField("Backup pod", pod.Name).Error("get empty DataUpload")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
|
||||
|
@ -430,6 +443,30 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
|
|||
return []reconcile.Request{requests}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
pods := &corev1.PodList{}
|
||||
var dataUploads []velerov2alpha1api.DataUpload
|
||||
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
|
||||
return nil, errors.Wrapf(err, "failed to list pods on current node")
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
if pod.Spec.NodeName != r.nodeName {
|
||||
r.logger.Debugf("Pod %s related data upload will not handled by %s nodes", pod.GetName(), r.nodeName)
|
||||
continue
|
||||
}
|
||||
du, err := findDataUploadByPod(cli, pod)
|
||||
if err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataUpload by pod")
|
||||
continue
|
||||
} else if du != nil {
|
||||
dataUploads = append(dataUploads, *du)
|
||||
}
|
||||
}
|
||||
return dataUploads, nil
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error {
|
||||
original := req.DeepCopy()
|
||||
mutate(req)
|
||||
|
@ -462,10 +499,10 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
|
|||
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
|
||||
du.Status.Message = errors.WithMessage(err, msg).Error()
|
||||
if du.Status.StartTimestamp.IsZero() {
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
}
|
||||
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataUpload status")
|
||||
}
|
||||
|
@ -535,3 +572,19 @@ func getOwnerObject(du *velerov2alpha1api.DataUpload) corev1.ObjectReference {
|
|||
APIVersion: du.APIVersion,
|
||||
}
|
||||
}
|
||||
|
||||
func findDataUploadByPod(client client.Client, pod corev1.Pod) (*velerov2alpha1api.DataUpload, error) {
|
||||
if label, exist := pod.Labels[velerov1api.DataUploadLabel]; exist {
|
||||
du := &velerov2alpha1api.DataUpload{}
|
||||
err := client.Get(context.Background(), types.NamespacedName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: label,
|
||||
}, du)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to find DataUpload by pod %s/%s", pod.Namespace, pod.Name)
|
||||
}
|
||||
return du, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -334,7 +334,7 @@ func TestReconcile(t *testing.T) {
|
|||
name: "runCancelableDataUpload is concurrent limited",
|
||||
dataMgr: datapath.NewManager(0),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Cancel(true).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
|
||||
expectedProcessed: false,
|
||||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute},
|
||||
|
@ -369,7 +369,7 @@ func TestReconcile(t *testing.T) {
|
|||
}
|
||||
|
||||
if test.du.Spec.SnapshotType == fakeSnapshotType {
|
||||
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.clock}}
|
||||
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock}}
|
||||
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
|
||||
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())}
|
||||
}
|
||||
|
@ -378,7 +378,7 @@ func TestReconcile(t *testing.T) {
|
|||
return &fakeDataUploadFSBR{
|
||||
du: test.du,
|
||||
kubeClient: r.client,
|
||||
clock: r.clock,
|
||||
clock: r.Clock,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -569,6 +569,14 @@ func TestFindDataUploadForPod(t *testing.T) {
|
|||
assert.Equal(t, du.Namespace, requests[0].Namespace)
|
||||
assert.Equal(t, du.Name, requests[0].Name)
|
||||
},
|
||||
}, {
|
||||
name: "no selected label found for pod",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Result(),
|
||||
checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) {
|
||||
// Assert that the function returns a single request
|
||||
assert.Empty(t, requests)
|
||||
},
|
||||
}, {
|
||||
name: "no matched pod",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
|
|
Loading…
Reference in New Issue