Merge pull request #7052 from qiuming-best/data-mover-fail-early
Make data mover fail earlypull/7125/head
commit
2fa785a3dd
|
@ -0,0 +1,2 @@
|
|||
Make data mover fail early
|
||||
|
|
@ -106,3 +106,8 @@ func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder {
|
|||
b.object.Status.Phase = phase
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *PodBuilder) Status(status corev1api.PodStatus) *PodBuilder {
|
||||
b.object.Status = status
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -486,10 +486,6 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
return false
|
||||
}
|
||||
|
||||
if newObj.Status.Phase != v1.PodRunning {
|
||||
return false
|
||||
}
|
||||
|
||||
if newObj.Spec.NodeName == "" {
|
||||
return false
|
||||
}
|
||||
|
@ -511,43 +507,55 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
|
||||
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
|
||||
pod := podObj.(*v1.Pod)
|
||||
|
||||
dd, err := findDataDownloadByPod(r.client, *pod)
|
||||
|
||||
log := r.logger.WithField("pod", pod.Name)
|
||||
if err != nil {
|
||||
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
|
||||
log.WithError(err).Error("unable to get DataDownload")
|
||||
return []reconcile.Request{}
|
||||
} else if dd == nil {
|
||||
r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload")
|
||||
log.Error("get empty DataDownload")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
log = log.WithFields(logrus.Fields{
|
||||
"Dataddownload": dd.Name,
|
||||
})
|
||||
|
||||
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
requests := make([]reconcile.Request, 1)
|
||||
if pod.Status.Phase == v1.PodRunning {
|
||||
log.Info("Preparing data download")
|
||||
// we don't expect anyone else update the CR during the Prepare process
|
||||
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
|
||||
if err != nil || !updated {
|
||||
log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
|
||||
err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason)
|
||||
})
|
||||
|
||||
r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
|
||||
|
||||
// we don't expect anyone else update the CR during the Prepare process
|
||||
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
|
||||
if err != nil || !updated {
|
||||
r.logger.WithFields(logrus.Fields{
|
||||
"Datadownload": dd.Name,
|
||||
"Restore pod": pod.Name,
|
||||
"updated": updated,
|
||||
}).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload")
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
log.Info("Exposed pod is in abnormal status, and datadownload is marked as cancel")
|
||||
} else {
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
requests[0] = reconcile.Request{
|
||||
request := reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: dd.Namespace,
|
||||
Name: dd.Name,
|
||||
},
|
||||
}
|
||||
|
||||
return requests
|
||||
return []reconcile.Request{request}
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
|
||||
|
|
|
@ -632,7 +632,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
|
|||
{
|
||||
name: "find dataDownload for pod",
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
|
||||
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
|
||||
// Assert that the function returns a single request
|
||||
assert.Len(t, requests, 1)
|
||||
|
|
|
@ -521,10 +521,6 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
return false
|
||||
}
|
||||
|
||||
if newObj.Status.Phase != corev1.PodRunning {
|
||||
return false
|
||||
}
|
||||
|
||||
if newObj.Spec.NodeName != r.nodeName {
|
||||
return false
|
||||
}
|
||||
|
@ -547,37 +543,56 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request {
|
||||
pod := podObj.(*corev1.Pod)
|
||||
du, err := findDataUploadByPod(r.client, *pod)
|
||||
log := r.logger.WithFields(logrus.Fields{
|
||||
"Backup pod": pod.Name,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload")
|
||||
log.WithError(err).Error("unable to get dataupload")
|
||||
return []reconcile.Request{}
|
||||
} else if du == nil {
|
||||
r.logger.WithField("Backup pod", pod.Name).Error("get empty DataUpload")
|
||||
log.Error("get empty DataUpload")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
log = log.WithFields(logrus.Fields{
|
||||
"Datadupload": du.Name,
|
||||
})
|
||||
|
||||
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)
|
||||
// we don't expect anyone else update the CR during the Prepare process
|
||||
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
|
||||
if err != nil || !updated {
|
||||
r.logger.WithFields(logrus.Fields{
|
||||
"Dataupload": du.Name,
|
||||
"Backup pod": pod.Name,
|
||||
"updated": updated,
|
||||
}).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload")
|
||||
if pod.Status.Phase == corev1.PodRunning {
|
||||
log.Info("Preparing dataupload")
|
||||
// we don't expect anyone else update the CR during the Prepare process
|
||||
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
|
||||
if err != nil || !updated {
|
||||
log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
|
||||
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
log.Info("Exposed pod is in abnormal status and dataupload is marked as cancel")
|
||||
} else {
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
requests := reconcile.Request{
|
||||
request := reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: du.Namespace,
|
||||
Name: du.Name,
|
||||
},
|
||||
}
|
||||
return []reconcile.Request{requests}
|
||||
return []reconcile.Request{request}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
|
|
|
@ -683,7 +683,7 @@ func TestFindDataUploadForPod(t *testing.T) {
|
|||
{
|
||||
name: "find dataUpload for pod",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
|
||||
checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) {
|
||||
// Assert that the function returns a single request
|
||||
assert.Len(t, requests, 1)
|
||||
|
|
|
@ -17,6 +17,7 @@ package kube
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -110,3 +111,23 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsPodUnrecoverable checks if the pod is in an abnormal state and could not be recovered
|
||||
// It could not cover all the cases but we could add more cases in the future
|
||||
func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, string) {
|
||||
// Check the Phase field
|
||||
if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown {
|
||||
log.Warnf("Pod is in abnormal state %s", pod.Status.Phase)
|
||||
return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase)
|
||||
}
|
||||
|
||||
// Check the Status field
|
||||
for _, containerStatus := range pod.Status.ContainerStatuses {
|
||||
// If the container's image state is ImagePullBackOff, it indicates an image pull failure
|
||||
if containerStatus.State.Waiting != nil && (containerStatus.State.Waiting.Reason == "ImagePullBackOff" || containerStatus.State.Waiting.Reason == "ErrImageNeverPull") {
|
||||
log.Warnf("Container %s in Pod %s/%s is in pull image failed with reason %s", containerStatus.Name, pod.Namespace, pod.Name, containerStatus.State.Waiting.Reason)
|
||||
return true, fmt.Sprintf("Container %s in Pod %s/%s is in pull image failed with reason %s", containerStatus.Name, pod.Namespace, pod.Name, containerStatus.State.Waiting.Reason)
|
||||
}
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
|
|
@ -343,3 +343,82 @@ func TestDeletePodIfAny(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsPodUnrecoverable(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *corev1api.Pod
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "pod is in failed state",
|
||||
pod: &corev1api.Pod{
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodFailed,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "pod is in unknown state",
|
||||
pod: &corev1api.Pod{
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodUnknown,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "container image pull failure",
|
||||
pod: &corev1api.Pod{
|
||||
Status: corev1api.PodStatus{
|
||||
ContainerStatuses: []corev1api.ContainerStatus{
|
||||
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "container image pull failure with different reason",
|
||||
pod: &corev1api.Pod{
|
||||
Status: corev1api.PodStatus{
|
||||
ContainerStatuses: []corev1api.ContainerStatus{
|
||||
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ErrImageNeverPull"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "container image pull failure with different reason",
|
||||
pod: &corev1api.Pod{
|
||||
Status: corev1api.PodStatus{
|
||||
ContainerStatuses: []corev1api.ContainerStatus{
|
||||
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "OtherReason"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "pod is normal",
|
||||
pod: &corev1api.Pod{
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodRunning,
|
||||
ContainerStatuses: []corev1api.ContainerStatus{
|
||||
{Ready: true, State: corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
got, _ := IsPodUnrecoverable(test.pod, velerotest.NewLogger())
|
||||
assert.Equal(t, test.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue