From 03dff100a3303a2b3d6c916cbe28a4368ca15d57 Mon Sep 17 00:00:00 2001 From: Ming Date: Thu, 2 Nov 2023 11:05:39 +0000 Subject: [PATCH] Make data mover fail early Signed-off-by: Ming --- changelogs/unreleased/7052-qiuming-best | 2 + pkg/builder/pod_builder.go | 5 ++ pkg/controller/data_download_controller.go | 50 +++++++----- .../data_download_controller_test.go | 2 +- pkg/controller/data_upload_controller.go | 49 ++++++++---- pkg/controller/data_upload_controller_test.go | 2 +- pkg/util/kube/pod.go | 21 +++++ pkg/util/kube/pod_test.go | 79 +++++++++++++++++++ 8 files changed, 170 insertions(+), 40 deletions(-) create mode 100644 changelogs/unreleased/7052-qiuming-best diff --git a/changelogs/unreleased/7052-qiuming-best b/changelogs/unreleased/7052-qiuming-best new file mode 100644 index 000000000..e1829fd74 --- /dev/null +++ b/changelogs/unreleased/7052-qiuming-best @@ -0,0 +1,2 @@ +Make data mover fail early + diff --git a/pkg/builder/pod_builder.go b/pkg/builder/pod_builder.go index 886d7a411..50f8f5e51 100644 --- a/pkg/builder/pod_builder.go +++ b/pkg/builder/pod_builder.go @@ -106,3 +106,8 @@ func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder { b.object.Status.Phase = phase return b } + +func (b *PodBuilder) Status(status corev1api.PodStatus) *PodBuilder { + b.object.Status = status + return b +} diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 065c48a5f..d28614cc6 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -485,10 +485,6 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } - if newObj.Status.Phase != v1.PodRunning { - return false - } - if newObj.Spec.NodeName == "" { return false } @@ -510,43 +506,55 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request { pod := podObj.(*v1.Pod) - dd, err := findDataDownloadByPod(r.client, *pod) + + log := r.logger.WithField("pod", pod.Name) if err != nil { - r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload") + log.WithError(err).Error("unable to get DataDownload") return []reconcile.Request{} } else if dd == nil { - r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload") + log.Error("get empty DataDownload") return []reconcile.Request{} } + log = log.WithFields(logrus.Fields{ + "Dataddownload": dd.Name, + }) if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { return []reconcile.Request{} } - requests := make([]reconcile.Request, 1) + if pod.Status.Phase == v1.PodRunning { + log.Info("Preparing data download") + // we don't expect anyone else update the CR during the Prepare process + updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload) + if err != nil || !updated { + log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload") + return []reconcile.Request{} + } + } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { + err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name), + func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason) + }) - r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name) - - // we don't expect anyone else update the CR during the Prepare process - updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload) - if err != nil || !updated { - r.logger.WithFields(logrus.Fields{ - "Datadownload": dd.Name, - "Restore pod": pod.Name, - "updated": updated, - }).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload") + if err != nil { + log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout") + return []reconcile.Request{} + } + log.Info("Exposed pod is in abnormal status, and datadownload is marked as cancel") + } else { return []reconcile.Request{} } - requests[0] = reconcile.Request{ + request := reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: dd.Namespace, Name: dd.Name, }, } - - return requests + return []reconcile.Request{request} } func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index afdadf61d..a3c44a7e3 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -632,7 +632,7 @@ func TestFindDataDownloadForPod(t *testing.T) { { name: "find dataDownload for pod", du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), - pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(), checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) { // Assert that the function returns a single request assert.Len(t, requests, 1) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index bbdf13943..ef28378e5 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -521,10 +521,6 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } - if newObj.Status.Phase != corev1.PodRunning { - return false - } - if newObj.Spec.NodeName != r.nodeName { return false } @@ -547,37 +543,56 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request { pod := podObj.(*corev1.Pod) du, err := findDataUploadByPod(r.client, *pod) + log := r.logger.WithFields(logrus.Fields{ + "Backup pod": pod.Name, + }) + if err != nil { - r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload") + log.WithError(err).Error("unable to get dataupload") return []reconcile.Request{} } else if du == nil { - r.logger.WithField("Backup pod", pod.Name).Error("get empty DataUpload") + log.Error("get empty DataUpload") return []reconcile.Request{} } + log = log.WithFields(logrus.Fields{ + "Datadupload": du.Name, + }) if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted { return []reconcile.Request{} } - r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name) - // we don't expect anyone else update the CR during the Prepare process - updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload) - if err != nil || !updated { - r.logger.WithFields(logrus.Fields{ - "Dataupload": du.Name, - "Backup pod": pod.Name, - "updated": updated, - }).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload") + if pod.Status.Phase == corev1.PodRunning { + log.Info("Preparing dataupload") + // we don't expect anyone else update the CR during the Prepare process + updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload) + if err != nil || !updated { + log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload") + return []reconcile.Request{} + } + } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early + err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason) + }) + + if err != nil { + log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout") + return []reconcile.Request{} + } + log.Info("Exposed pod is in abnormal status and dataupload is marked as cancel") + } else { return []reconcile.Request{} } - requests := reconcile.Request{ + request := reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: du.Namespace, Name: du.Name, }, } - return []reconcile.Request{requests} + return []reconcile.Request{request} } func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index cdbadf9d4..25ea662bc 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -683,7 +683,7 @@ func TestFindDataUploadForPod(t *testing.T) { { name: "find dataUpload for pod", du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(), checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) { // Assert that the function returns a single request assert.Len(t, requests, 1) diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index c1464a3d6..d93657878 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -17,6 +17,7 @@ package kube import ( "context" + "fmt" "time" "github.com/pkg/errors" @@ -110,3 +111,23 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface return nil } + +// IsPodUnrecoverable checks if the pod is in an abnormal state and could not be recovered +// It could not cover all the cases but we could add more cases in the future +func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, string) { + // Check the Phase field + if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown { + log.Warnf("Pod is in abnormal state %s", pod.Status.Phase) + return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase) + } + + // Check the Status field + for _, containerStatus := range pod.Status.ContainerStatuses { + // If the container's image state is ImagePullBackOff, it indicates an image pull failure + if containerStatus.State.Waiting != nil && (containerStatus.State.Waiting.Reason == "ImagePullBackOff" || containerStatus.State.Waiting.Reason == "ErrImageNeverPull") { + log.Warnf("Container %s in Pod %s/%s is in pull image failed with reason %s", containerStatus.Name, pod.Namespace, pod.Name, containerStatus.State.Waiting.Reason) + return true, fmt.Sprintf("Container %s in Pod %s/%s is in pull image failed with reason %s", containerStatus.Name, pod.Namespace, pod.Name, containerStatus.State.Waiting.Reason) + } + } + return false, "" +} diff --git a/pkg/util/kube/pod_test.go b/pkg/util/kube/pod_test.go index 6f39c0b23..f1cdac043 100644 --- a/pkg/util/kube/pod_test.go +++ b/pkg/util/kube/pod_test.go @@ -343,3 +343,82 @@ func TestDeletePodIfAny(t *testing.T) { }) } } + +func TestIsPodUnrecoverable(t *testing.T) { + tests := []struct { + name string + pod *corev1api.Pod + want bool + }{ + { + name: "pod is in failed state", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + }, + want: true, + }, + { + name: "pod is in unknown state", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodUnknown, + }, + }, + want: true, + }, + { + name: "container image pull failure", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}}, + }, + }, + }, + want: true, + }, + { + name: "container image pull failure with different reason", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ErrImageNeverPull"}}}, + }, + }, + }, + want: true, + }, + { + name: "container image pull failure with different reason", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "OtherReason"}}}, + }, + }, + }, + want: false, + }, + { + name: "pod is normal", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodRunning, + ContainerStatuses: []corev1api.ContainerStatus{ + {Ready: true, State: corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}}, + }, + }, + }, + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, _ := IsPodUnrecoverable(test.pod, velerotest.NewLogger()) + assert.Equal(t, test.want, got) + }) + } +}