Mark in-progress CRs as failed when starting the server

Mark in-progress CRs as failed when starting the server

Fixes #4953

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
pull/4966/head
Wenkai Yin(尹文开) 2022-06-06 22:56:29 +08:00
parent 9af031b84a
commit 6fa4d7d606
10 changed files with 204 additions and 186 deletions

View File

@ -22,7 +22,9 @@ import (
"net/http"
"os"
"strings"
"time"
"github.com/apex/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
@ -32,11 +34,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
@ -50,6 +54,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
@ -105,6 +110,7 @@ type resticServer struct {
metrics *metrics.ServerMetrics
metricsAddress string
namespace string
nodeName string
}
func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAddress string) (*resticServer, error) {
@ -121,11 +127,13 @@ func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAd
v1.AddToScheme(scheme)
storagev1api.AddToScheme(scheme)
nodeName := os.Getenv("NODE_NAME")
// use a field selector to filter to only pods scheduled on this node.
cacheOption := cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": os.Getenv("NODE_NAME")}.AsSelector(),
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
},
}
@ -145,6 +153,7 @@ func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAd
mgr: mgr,
metricsAddress: metricAddress,
namespace: factory.Namespace(),
nodeName: nodeName,
}
// the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot
@ -173,7 +182,9 @@ func (s *resticServer) run() {
}()
s.metrics = metrics.NewResticServerMetrics()
s.metrics.RegisterAllMetrics()
s.metrics.InitResticMetricsForNode(os.Getenv("NODE_NAME"))
s.metrics.InitResticMetricsForNode(s.nodeName)
s.markInProgressCRsFailed()
s.logger.Info("Starting controllers")
@ -193,7 +204,7 @@ func (s *resticServer) run() {
Clock: clock.RealClock{},
Metrics: s.metrics,
CredsFileStore: credentialFileStore,
NodeName: os.Getenv("NODE_NAME"),
NodeName: s.nodeName,
FileSystem: filesystem.NewFileSystem(),
ResticExec: restic.BackupExec{},
Log: s.logger,
@ -229,7 +240,7 @@ func (s *resticServer) validatePodVolumesHostPath(client kubernetes.Interface) e
}
}
pods, err := client.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", os.Getenv("NODE_NAME"))})
pods, err := client.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", s.nodeName)})
if err != nil {
return errors.WithStack(err)
}
@ -259,3 +270,83 @@ func (s *resticServer) validatePodVolumesHostPath(client kubernetes.Interface) e
return nil
}
// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress status
// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue
func (s *resticServer) markInProgressCRsFailed() {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
log.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
s.markInProgressPVBsFailed(client)
s.markInProgressPVRsFailed(client)
}
func (s *resticServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list podvolumebackups")
return
}
for _, pvb := range pvbs.Items {
if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseInProgress {
log.Debugf("the status of podvolumebackup %q is %q, skip", pvb.GetName(), pvb.Status.Phase)
continue
}
if pvb.Spec.Node != s.nodeName {
log.Debugf("the node of podvolumebackup %q is %q, not %q, skip", pvb.GetName(), pvb.Spec.Node, s.nodeName)
continue
}
updated := pvb.DeepCopy()
updated.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
updated.Status.Message = fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, updated.Status.Phase)
updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()}
if err := kube.Patch(s.ctx, &pvb, updated, client); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName())
continue
}
log.WithField("podvolumebackup", pvb.GetName()).Warn(updated.Status.Message)
}
}
func (s *resticServer) markInProgressPVRsFailed(client ctrlclient.Client) {
pvrs := &velerov1api.PodVolumeRestoreList{}
if err := client.List(s.ctx, pvrs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list podvolumerestores")
return
}
for _, pvr := range pvrs.Items {
if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress {
log.Debugf("the status of podvolumerestore %q is %q, skip", pvr.GetName(), pvr.Status.Phase)
continue
}
pod := &v1.Pod{}
if err := client.Get(s.ctx, types.NamespacedName{
Namespace: pvr.Spec.Pod.Namespace,
Name: pvr.Spec.Pod.Name,
}, pod); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to get pod \"%s/%s\" of podvolumerestore %q",
pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name, pvr.GetName())
continue
}
if pod.Spec.NodeName != s.nodeName {
log.Debugf("the node of pod referenced by podvolumebackup %q is %q, not %q, skip", pvr.GetName(), pod.Spec.NodeName, s.nodeName)
continue
}
updated := pvr.DeepCopy()
updated.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
updated.Status.Message = fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, updated.Status.Phase)
updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()}
if err := kube.Patch(s.ctx, &pvr, updated, client); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName())
continue
}
log.WithField("podvolumerestore", pvr.GetName()).Warn(updated.Status.Message)
}
}

View File

@ -74,6 +74,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"github.com/vmware-tanzu/velero/internal/storage"
@ -378,6 +379,8 @@ func (s *server) run() error {
return err
}
markInProgressCRsFailed(s.ctx, s.mgr.GetConfig(), s.mgr.GetScheme(), s.namespace, s.logger)
if err := s.runControllers(s.config.defaultVolumeSnapshotLocations); err != nil {
return err
}
@ -925,3 +928,65 @@ func (w *CSIInformerFactoryWrapper) WaitForCacheSync(stopCh <-chan struct{}) map
}
return nil
}
// if there is a restarting during the reconciling of backups/restores/etc, these CRs may be stuck in progress status
// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue
func markInProgressCRsFailed(ctx context.Context, cfg *rest.Config, scheme *runtime.Scheme, namespace string, log logrus.FieldLogger) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(cfg, ctrlclient.Options{Scheme: scheme})
if err != nil {
log.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
markInProgressBackupsFailed(ctx, client, namespace, log)
markInProgressRestoresFailed(ctx, client, namespace, log)
}
func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client, namespace string, log logrus.FieldLogger) {
backups := &velerov1api.BackupList{}
if err := client.List(ctx, backups, &ctrlclient.MatchingFields{"metadata.namespace": namespace}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list backups")
return
}
for _, backup := range backups.Items {
if backup.Status.Phase != velerov1api.BackupPhaseInProgress {
log.Debugf("the status of backup %q is %q, skip", backup.GetName(), backup.Status.Phase)
continue
}
updated := backup.DeepCopy()
updated.Status.Phase = velerov1api.BackupPhaseFailed
updated.Status.FailureReason = fmt.Sprintf("get a backup with status %q during the server starting, mark it as %q", velerov1api.BackupPhaseInProgress, updated.Status.Phase)
updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()}
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&backup)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to patch backup %q", backup.GetName())
continue
}
log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason)
}
}
func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client, namespace string, log logrus.FieldLogger) {
restores := &velerov1api.RestoreList{}
if err := client.List(ctx, restores, &ctrlclient.MatchingFields{"metadata.namespace": namespace}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list restores")
return
}
for _, restore := range restores.Items {
if restore.Status.Phase != velerov1api.RestorePhaseInProgress {
log.Debugf("the status of restore %q is %q, skip", restore.GetName(), restore.Status.Phase)
continue
}
updated := restore.DeepCopy()
updated.Status.Phase = velerov1api.RestorePhaseFailed
updated.Status.FailureReason = fmt.Sprintf("get a restore with status %q during the server starting, mark it as %q", velerov1api.RestorePhaseInProgress, updated.Status.Phase)
updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()}
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&restore)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to patch restore %q", restore.GetName())
continue
}
log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason)
}
}

View File

@ -157,12 +157,13 @@ func NewBackupController(
backup := obj.(*velerov1api.Backup)
switch backup.Status.Phase {
case "", velerov1api.BackupPhaseNew, velerov1api.BackupPhaseInProgress:
case "", velerov1api.BackupPhaseNew:
// only process new backups
default:
c.logger.WithFields(logrus.Fields{
"backup": kubeutil.NamespaceAndName(backup),
"phase": backup.Status.Phase,
}).Debug("Backup is not new or in-progress, skipping")
}).Debug("Backup is not new, skipping")
return
}
@ -250,22 +251,7 @@ func (c *backupController) processBackup(key string) error {
// this key (even though it was a no-op).
switch original.Status.Phase {
case "", velerov1api.BackupPhaseNew:
case velerov1api.BackupPhaseInProgress:
// A backup may stay in-progress forever because of
// 1) the controller restarts during the processing of a backup
// 2) the backup with in-progress status isn't updated to completed or failed status successfully
// So we try to mark such Backups as failed to avoid it
updated := original.DeepCopy()
updated.Status.Phase = velerov1api.BackupPhaseFailed
updated.Status.FailureReason = fmt.Sprintf("got a Backup with unexpected status %q, this may be due to a restart of the controller during the backing up, mark it as %q",
velerov1api.BackupPhaseInProgress, updated.Status.Phase)
updated.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
_, err = patchBackup(original, updated, c.client)
if err != nil {
return errors.Wrapf(err, "error updating Backup status to %s", updated.Status.Phase)
}
log.Warn(updated.Status.FailureReason)
return nil
// only process new backups
default:
return nil
}

View File

@ -94,6 +94,11 @@ func TestProcessBackupNonProcessedItems(t *testing.T) {
key: "velero/backup-1",
backup: defaultBackup().Phase(velerov1api.BackupPhaseFailedValidation).Result(),
},
{
name: "InProgress backup is not processed",
key: "velero/backup-1",
backup: defaultBackup().Phase(velerov1api.BackupPhaseInProgress).Result(),
},
{
name: "Completed backup is not processed",
key: "velero/backup-1",
@ -135,28 +140,6 @@ func TestProcessBackupNonProcessedItems(t *testing.T) {
}
}
func TestMarkInProgressBackupAsFailed(t *testing.T) {
backup := defaultBackup().Phase(velerov1api.BackupPhaseInProgress).Result()
clientset := fake.NewSimpleClientset(backup)
sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
logger := logging.DefaultLogger(logrus.DebugLevel, logging.FormatText)
c := &backupController{
genericController: newGenericController("backup-test", logger),
client: clientset.VeleroV1(),
lister: sharedInformers.Velero().V1().Backups().Lister(),
clock: &clock.RealClock{},
}
require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(backup))
err := c.processBackup(fmt.Sprintf("%s/%s", backup.Namespace, backup.Name))
require.Nil(t, err)
res, err := clientset.VeleroV1().Backups(backup.Namespace).Get(context.TODO(), backup.Name, metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, velerov1api.BackupPhaseFailed, res.Status.Phase)
}
func TestProcessBackupValidationFailures(t *testing.T) {
defaultBackupLocation := builder.ForBackupStorageLocation("velero", "loc-1").Result()

View File

@ -93,20 +93,9 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
switch pvb.Status.Phase {
case "", velerov1api.PodVolumeBackupPhaseNew:
case velerov1api.PodVolumeBackupPhaseInProgress:
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = fmt.Sprintf("got a PodVolumeBackup with unexpected status %q, this may be due to a restart of the controller during the backing up, mark it as %q",
velerov1api.PodVolumeBackupPhaseInProgress, pvb.Status.Phase)
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := kube.Patch(ctx, original, &pvb, r.Client); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
return ctrl.Result{}, err
}
log.Warn(pvb.Status.Message)
return ctrl.Result{}, nil
// Only process new items.
default:
log.Debug("PodVolumeBackup is not new or in-progress, not processing")
log.Debug("PodVolumeBackup is not new, not processing")
return ctrl.Result{}, nil
}
@ -298,7 +287,7 @@ func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.Po
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = msg
pvb.Status.Message = errors.WithMessage(err, msg).Error()
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err = kube.Patch(ctx, original, pvb, r.Client); err != nil {

View File

@ -179,16 +179,16 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("in progress phase pvb on same node should be marked as failed", request{
Entry("in progress phase pvb on same node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Node("test_node").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: true,
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Result(),
expectedRequeue: ctrl.Result{},
}),

View File

@ -134,6 +134,11 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) {
if !isPVRNew(pvr) {
log.Debug("PodVolumeRestore is not new, skip")
return false, nil, nil
}
// we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller
// so we don't need to compare the node anymore
pod := &corev1api.Pod{}
@ -146,28 +151,6 @@ func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logr
return false, nil, err
}
// the status checking logic must be put after getting the PVR's pod because that the getting pod logic
// makes sure the PVR's pod is on the same node with the controller. The controller should only process
// the PVRs on the same node
switch pvr.Status.Phase {
case "", velerov1api.PodVolumeRestorePhaseNew:
case velerov1api.PodVolumeRestorePhaseInProgress:
original := pvr.DeepCopy()
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = fmt.Sprintf("got a PodVolumeRestore with unexpected status %q, this may be due to a restart of the controller during the restoring, mark it as %q",
velerov1api.PodVolumeRestorePhaseInProgress, pvr.Status.Phase)
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if err := kube.Patch(ctx, original, pvr, c.Client); err != nil {
log.WithError(err).Error("Unable to update status to failed")
return false, nil, err
}
log.Warn(pvr.Status.Message)
return false, nil, nil
default:
log.Debug("PodVolumeRestore is not new or in-progress, skip")
return false, nil, nil
}
if !isResticInitContainerRunning(pod) {
log.Debug("Pod is not running restic-wait init container, skip")
return false, nil, nil
@ -209,6 +192,10 @@ func (c *PodVolumeRestoreReconciler) findVolumeRestoresForPod(pod client.Object)
return requests
}
func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew
}
func isResticInitContainerRunning(pod *corev1api.Pod) bool {
// Restic wait container can be anywhere in the list of init containers, but must be running.
i := getResticInitContainerIndex(pod)

View File

@ -27,7 +27,6 @@ import (
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@ -44,48 +43,15 @@ func TestShouldProcess(t *testing.T) {
obj *velerov1api.PodVolumeRestore
pod *corev1api.Pod
shouldProcessed bool
expectedPhase velerov1api.PodVolumeRestorePhase
}{
{
name: "Unable to get pvr's pod should not be processed",
name: "InProgress phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: "",
},
},
shouldProcessed: false,
},
{
name: "InProgress phase pvr should be marked as failed",
obj: &velerov1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseInProgress,
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
},
},
shouldProcessed: false,
expectedPhase: velerov1api.PodVolumeRestorePhaseFailed,
},
{
name: "Completed phase pvr should not be processed",
@ -94,22 +60,10 @@ func TestShouldProcess(t *testing.T) {
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseCompleted,
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
},
},
shouldProcessed: false,
},
{
@ -119,6 +73,15 @@ func TestShouldProcess(t *testing.T) {
Namespace: "velero",
Name: "pvr-1",
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseFailed,
},
},
shouldProcessed: false,
},
{
name: "Unable to get pvr's pod should not be processed",
obj: &velerov1api.PodVolumeRestore{
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
@ -126,13 +89,7 @@ func TestShouldProcess(t *testing.T) {
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseFailed,
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
Phase: "",
},
},
shouldProcessed: false,
@ -244,12 +201,6 @@ func TestShouldProcess(t *testing.T) {
shouldProcess, _, _ := c.shouldProcess(ctx, c.logger, ts.obj)
require.Equal(t, ts.shouldProcessed, shouldProcess)
if len(ts.expectedPhase) > 0 {
pvr := &velerov1api.PodVolumeRestore{}
err := c.Client.Get(ctx, types.NamespacedName{Namespace: ts.obj.Namespace, Name: ts.obj.Name}, pvr)
require.Nil(t, err)
assert.Equal(t, ts.expectedPhase, pvr.Status.Phase)
}
})
}
}

View File

@ -144,12 +144,13 @@ func NewRestoreController(
restore := obj.(*api.Restore)
switch restore.Status.Phase {
case "", api.RestorePhaseNew, api.RestorePhaseInProgress:
case "", api.RestorePhaseNew:
// only process new restores
default:
c.logger.WithFields(logrus.Fields{
"restore": kubeutil.NamespaceAndName(restore),
"phase": restore.Status.Phase,
}).Debug("Restore is not new or in-progress, skipping")
}).Debug("Restore is not new, skipping")
return
}
@ -201,21 +202,7 @@ func (c *restoreController) processQueueItem(key string) error {
// is ("" | New)
switch restore.Status.Phase {
case "", api.RestorePhaseNew:
case api.RestorePhaseInProgress:
// A restore may stay in-progress forever because of
// 1) the controller restarts during the processing of a restore
// 2) the restore with in-progress status isn't updated to completed or failed status successfully
// So we try to mark such restores as failed to avoid it
updated := restore.DeepCopy()
updated.Status.Phase = api.RestorePhaseFailed
updated.Status.FailureReason = fmt.Sprintf("got a Restore with unexpected status %q, this may be due to a restart of the controller during the restore, mark it as %q",
api.RestorePhaseInProgress, updated.Status.Phase)
_, err = patchRestore(restore, updated, c.restoreClient)
if err != nil {
return errors.Wrapf(err, "error updating Restore status to %s", updated.Status.Phase)
}
log.Warn(updated.Status.FailureReason)
return nil
// only process new restores
default:
return nil
}

View File

@ -20,7 +20,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"testing"
"time"
@ -171,6 +170,11 @@ func TestProcessQueueItemSkips(t *testing.T) {
restoreKey: "foo/bar",
expectError: true,
},
{
name: "restore with phase InProgress does not get processed",
restoreKey: "foo/bar",
restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseInProgress).Result(),
},
{
name: "restore with phase Completed does not get processed",
restoreKey: "foo/bar",
@ -222,31 +226,6 @@ func TestProcessQueueItemSkips(t *testing.T) {
}
}
func TestMarkInProgressRestoreAsFailed(t *testing.T) {
var (
restore = builder.ForRestore("velero", "bar").Phase(velerov1api.RestorePhaseInProgress).Result()
client = fake.NewSimpleClientset(restore)
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
)
c := restoreController{
genericController: newGenericController("restore-test", logger),
restoreClient: client.VeleroV1(),
restoreLister: sharedInformers.Velero().V1().Restores().Lister(),
}
err := sharedInformers.Velero().V1().Restores().Informer().GetStore().Add(restore)
require.Nil(t, err)
err = c.processQueueItem(fmt.Sprintf("%s/%s", restore.Namespace, restore.Name))
require.Nil(t, err)
res, err := c.restoreClient.Restores(restore.Namespace).Get(context.Background(), restore.Name, metav1.GetOptions{})
require.Nil(t, err)
assert.Equal(t, velerov1api.RestorePhaseFailed, res.Status.Phase)
}
func TestProcessQueueItem(t *testing.T) {
defaultStorageLocation := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()