Merge pull request #4893 from ywk253100/220506_restart

Make in-progress PVB/PVR as failed when restic controller restarts to avoid hanging backup/restore
pull/4898/head
Xun Jiang/Bruce Jiang 2022-05-07 17:32:40 +08:00 committed by GitHub
commit 62dde34f86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 224 additions and 114 deletions

View File

@ -0,0 +1 @@
Make in-progress PVB/PVR as failed when restic controller restarts to avoid hanging backup/restore

View File

@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -87,37 +86,40 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
log.Info("PodVolumeBackup starting")
// Initialize the patch helper.
patchHelper, err := patch.NewHelper(&pvb, r.Client)
if err != nil {
log.WithError(err).Error("getting patch helper to update this resource")
return ctrl.Result{}, errors.WithStack(err)
}
defer func() {
// Always attempt to patch the PVB object and status after each reconciliation.
if err := patchHelper.Patch(ctx, &pvb); err != nil {
log.WithError(err).Error("updating PodVolumeBackup resource")
return
}
}()
// Only process items for this node.
if pvb.Spec.Node != r.NodeName {
return ctrl.Result{}, nil
}
// Only process new items.
if pvb.Status.Phase != "" && pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseNew {
log.Debug("PodVolumeBackup is not new, not processing")
switch pvb.Status.Phase {
case "", velerov1api.PodVolumeBackupPhaseNew:
case velerov1api.PodVolumeBackupPhaseInProgress:
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = fmt.Sprintf("got a PodVolumeBackup with unexpected status %q, this may be due to a restart of the controller during the backing up, mark it as %q",
velerov1api.PodVolumeBackupPhaseInProgress, pvb.Status.Phase)
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := kube.Patch(ctx, original, &pvb, r.Client); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
return ctrl.Result{}, err
}
log.Warn(pvb.Status.Message)
return ctrl.Result{}, nil
default:
log.Debug("PodVolumeBackup is not new or in-progress, not processing")
return ctrl.Result{}, nil
}
r.Metrics.RegisterPodVolumeBackupEnqueue(r.NodeName)
// Update status to InProgress.
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress
pvb.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := kube.Patch(ctx, original, &pvb, r.Client); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
return ctrl.Result{}, err
}
var pod corev1.Pod
podNamespacedName := client.ObjectKey{
@ -125,13 +127,13 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
Name: pvb.Spec.Pod.Name,
}
if err := r.Client.Get(ctx, podNamespacedName, &pod); err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name))
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log)
}
var resticDetails resticDetails
resticCmd, err := r.buildResticCommand(ctx, log, &pvb, &pod, &resticDetails)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "building Restic command")
return r.updateStatusToFailed(ctx, &pvb, err, "building Restic command", log)
}
defer os.Remove(resticDetails.credsFile)
@ -159,7 +161,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if strings.Contains(stderr, "snapshot is empty") {
emptySnapshot = true
} else {
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%s", stderr))
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%s", stderr), log)
}
}
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
@ -177,11 +179,12 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
snapshotID, err = r.ResticExec.GetSnapshotID(cmd)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "getting snapshot id")
return r.updateStatusToFailed(ctx, &pvb, err, "getting snapshot id", log)
}
}
// Update status to Completed with path & snapshot ID.
original = pvb.DeepCopy()
pvb.Status.Path = resticDetails.path
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted
pvb.Status.SnapshotID = snapshotID
@ -189,6 +192,10 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if emptySnapshot {
pvb.Status.Message = "volume was empty so no snapshot was taken"
}
if err = kube.Patch(ctx, original, &pvb, r.Client); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
return ctrl.Result{}, err
}
latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time)
latencySeconds := float64(latencyDuration / time.Second)
@ -280,15 +287,26 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l
// the PVB with the new progress.
func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
original := pvb.DeepCopy()
pvb.Status.Progress = progress
if err := kube.Patch(context.Background(), original, pvb, r.Client); err != nil {
log.WithError(err).Error("error update progress")
}
}
}
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string) (ctrl.Result, error) {
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = msg
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
return ctrl.Result{}, errors.Wrap(err, msg)
if err = kube.Patch(ctx, original, pvb, r.Client); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
type resticDetails struct {

View File

@ -179,16 +179,16 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("in progress phase pvb on same node should not be processed", request{
Entry("in progress phase pvb on same node should be marked as failed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Node("test_node").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expectedProcessed: true,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Result(),
expectedRequeue: ctrl.Result{},
}),

View File

@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
@ -101,24 +100,21 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
if they interfere with volumes being restored: %s index %d`, restic.InitContainer, restic.InitContainer, resticInitContainerIndex)
}
patchHelper, err := patch.NewHelper(pvr, c.Client)
if err != nil {
log.WithError(err).Error("Unable to new patch helper")
return ctrl.Result{}, err
}
log.Info("Restore starting")
original := pvr.DeepCopy()
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress
pvr.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()}
if err = patchHelper.Patch(ctx, pvr); err != nil {
if err = kube.Patch(ctx, original, pvr, c.Client); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
}
if err = c.processRestore(ctx, pvr, pod, log); err != nil {
original = pvr.DeepCopy()
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = err.Error()
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if e := patchHelper.Patch(ctx, pvr); e != nil {
if e := kube.Patch(ctx, original, pvr, c.Client); e != nil {
log.WithError(err).Error("Unable to update status to failed")
}
@ -126,9 +122,10 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
original = pvr.DeepCopy()
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if err = patchHelper.Patch(ctx, pvr); err != nil {
if err = kube.Patch(ctx, original, pvr, c.Client); err != nil {
log.WithError(err).Error("Unable to update status to completed")
return ctrl.Result{}, err
}
@ -137,11 +134,6 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) {
if !isPVRNew(pvr) {
log.Debug("PodVolumeRestore is not new, skip")
return false, nil, nil
}
// we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller
// so we don't need to compare the node anymore
pod := &corev1api.Pod{}
@ -154,6 +146,28 @@ func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logr
return false, nil, err
}
// the status checking logic must be put after getting the PVR's pod because that the getting pod logic
// makes sure the PVR's pod is on the same node with the controller. The controller should only process
// the PVRs on the same node
switch pvr.Status.Phase {
case "", velerov1api.PodVolumeRestorePhaseNew:
case velerov1api.PodVolumeRestorePhaseInProgress:
original := pvr.DeepCopy()
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = fmt.Sprintf("got a PodVolumeRestore with unexpected status %q, this may be due to a restart of the controller during the restoring, mark it as %q",
velerov1api.PodVolumeRestorePhaseInProgress, pvr.Status.Phase)
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if err := kube.Patch(ctx, original, pvr, c.Client); err != nil {
log.WithError(err).Error("Unable to update status to failed")
return false, nil, err
}
log.Warn(pvr.Status.Message)
return false, nil, nil
default:
log.Debug("PodVolumeRestore is not new or in-progress, skip")
return false, nil, nil
}
if !isResticInitContainerRunning(pod) {
log.Debug("Pod is not running restic-wait init container, skip")
return false, nil, nil
@ -163,8 +177,6 @@ func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logr
}
func (c *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
mgr.GetConfig()
// The pod may not being scheduled at the point when its PVRs are initially reconciled.
// By watching the pods, we can trigger the PVR reconciliation again once the pod is finally scheduled on the node.
return ctrl.NewControllerManagedBy(mgr).
@ -197,10 +209,6 @@ func (c *PodVolumeRestoreReconciler) findVolumeRestoresForPod(pod client.Object)
return requests
}
func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew
}
func isResticInitContainerRunning(pod *corev1api.Pod) bool {
// Restic wait container can be anywhere in the list of init containers, but must be running.
i := getResticInitContainerIndex(pod)
@ -337,13 +345,9 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
// the PVR with the new progress
func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
helper, err := patch.NewHelper(req, c.Client)
if err != nil {
log.WithError(err).Error("Unable to new patch helper")
return
}
original := req.DeepCopy()
req.Status.Progress = progress
if err = helper.Patch(context.Background(), req); err != nil {
if err := kube.Patch(context.Background(), original, req, c.Client); err != nil {
log.WithError(err).Error("Unable to update PodVolumeRestore progress")
}
}

View File

@ -21,20 +21,19 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/test"
)
func TestShouldProcess(t *testing.T) {
@ -45,34 +44,8 @@ func TestShouldProcess(t *testing.T) {
obj *velerov1api.PodVolumeRestore
pod *corev1api.Pod
shouldProcessed bool
expectedPhase velerov1api.PodVolumeRestorePhase
}{
{
name: "InProgress phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseInProgress,
},
},
shouldProcessed: false,
},
{
name: "Completed phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseCompleted,
},
},
shouldProcessed: false,
},
{
name: "Failed phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseFailed,
},
},
shouldProcessed: false,
},
{
name: "Unable to get pvr's pod should not be processed",
obj: &velerov1api.PodVolumeRestore{
@ -88,9 +61,89 @@ func TestShouldProcess(t *testing.T) {
},
shouldProcessed: false,
},
{
name: "InProgress phase pvr should be marked as failed",
obj: &velerov1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseInProgress,
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
},
},
shouldProcessed: false,
expectedPhase: velerov1api.PodVolumeRestorePhaseFailed,
},
{
name: "Completed phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseCompleted,
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
},
},
shouldProcessed: false,
},
{
name: "Failed phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseFailed,
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
},
},
shouldProcessed: false,
},
{
name: "Empty phase pvr with pod on node not running init container should not be processed",
obj: &velerov1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
@ -127,6 +180,10 @@ func TestShouldProcess(t *testing.T) {
{
name: "Empty phase pvr with pod on node running init container should be enqueued",
obj: &velerov1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvr-1",
},
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
@ -166,40 +223,37 @@ func TestShouldProcess(t *testing.T) {
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
builder := fake.NewClientBuilder()
if test.pod != nil {
builder.WithObjects(test.pod)
for _, ts := range tests {
t.Run(ts.name, func(t *testing.T) {
ctx := context.Background()
var objs []runtime.Object
if ts.obj != nil {
objs = append(objs, ts.obj)
}
if ts.pod != nil {
objs = append(objs, ts.pod)
}
cli := test.NewFakeControllerRuntimeClient(t, objs...)
c := &PodVolumeRestoreReconciler{
logger: logrus.New(),
Client: builder.Build(),
Client: cli,
clock: &clock.RealClock{},
}
shouldProcess, _, _ := c.shouldProcess(context.Background(), c.logger, test.obj)
require.Equal(t, test.shouldProcessed, shouldProcess)
shouldProcess, _, _ := c.shouldProcess(ctx, c.logger, ts.obj)
require.Equal(t, ts.shouldProcessed, shouldProcess)
if len(ts.expectedPhase) > 0 {
pvr := &velerov1api.PodVolumeRestore{}
err := c.Client.Get(ctx, types.NamespacedName{Namespace: ts.obj.Namespace, Name: ts.obj.Name}, pvr)
require.Nil(t, err)
assert.Equal(t, ts.expectedPhase, pvr.Status.Phase)
}
})
}
}
func TestIsPVRNew(t *testing.T) {
pvr := &velerov1api.PodVolumeRestore{}
expectationByStatus := map[velerov1api.PodVolumeRestorePhase]bool{
"": true,
velerov1api.PodVolumeRestorePhaseNew: true,
velerov1api.PodVolumeRestorePhaseInProgress: false,
velerov1api.PodVolumeRestorePhaseCompleted: false,
velerov1api.PodVolumeRestorePhaseFailed: false,
}
for phase, expected := range expectationByStatus {
pvr.Status.Phase = phase
assert.Equal(t, expected, isPVRNew(pvr))
}
}
func TestIsResticContainerRunning(t *testing.T) {
tests := []struct {
name string

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@ -239,3 +240,12 @@ func IsCRDReady(crd *unstructured.Unstructured) (bool, error) {
return false, fmt.Errorf("unable to handle CRD with version %s", ver)
}
}
// Patch the given object
func Patch(ctx context.Context, original, updated client.Object, client client.Client) error {
helper, err := patch.NewHelper(original, client)
if err != nil {
return err
}
return helper.Patch(ctx, updated)
}

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
@ -425,3 +426,25 @@ func TestIsCRDReady(t *testing.T) {
_, err = IsCRDReady(obj)
assert.NotNil(t, err)
}
func TestPatch(t *testing.T) {
original := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod",
},
}
cli := fake.NewClientBuilder().WithObjects(original).Build()
updated := original.DeepCopy()
updated.SetLabels(map[string]string{"key": "value"})
ctx := context.Background()
err := Patch(ctx, original, updated, cli)
require.Nil(t, err)
pod := &corev1.Pod{}
err = cli.Get(ctx, types.NamespacedName{Namespace: "default", Name: "pod"}, pod)
require.Nil(t, err)
assert.Equal(t, 1, len(pod.GetLabels()))
}