/* Copyright The Velero Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "context" "fmt" "strings" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/constant" "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/restorehelper" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util" "github.com/vmware-tanzu/velero/pkg/util/kube" ) func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ client: client, mgr: mgr, kubeClient: kubeClient, logger: logger.WithField("controller", "PodVolumeRestore"), nodeName: nodeName, clock: &clocks.RealClock{}, podResources: podResources, dataPathMgr: dataPathMgr, preparingTimeout: preparingTimeout, resourceTimeout: resourceTimeout, exposer: exposer.NewPodVolumeExposer(kubeClient, logger), cancelledPVR: make(map[string]time.Time), } } type PodVolumeRestoreReconciler struct { client client.Client mgr manager.Manager kubeClient kubernetes.Interface logger logrus.FieldLogger nodeName string clock clocks.WithTickerAndDelayedExecution podResources corev1api.ResourceRequirements exposer exposer.PodVolumeExposer dataPathMgr *datapath.Manager preparingTimeout time.Duration resourceTimeout time.Duration cancelledPVR map[string]time.Time } // +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch // +kubebuilder:rbac:groups="",resources=pods,verbs=get // +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get // +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.logger.WithField("PodVolumeRestore", req.NamespacedName.String()) log.Info("Reconciling PVR by advanced controller") pvr := &velerov1api.PodVolumeRestore{} if err := r.client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil { if apierrors.IsNotFound(err) { log.Warn("PVR not found, skip") return ctrl.Result{}, nil } log.WithError(err).Error("Unable to get the PVR") return ctrl.Result{}, err } log = log.WithField("pod", fmt.Sprintf("%s/%s", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)) if len(pvr.OwnerReferences) == 1 { log = log.WithField("restore", fmt.Sprintf("%s/%s", pvr.Namespace, pvr.OwnerReferences[0].Name)) } // Logic for clear resources when pvr been deleted if !isPVRInFinalState(pvr) { if !controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool { if controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { return false } controllerutil.AddFinalizer(pvr, PodVolumeFinalizer) return true }); err != nil { log.WithError(err).Errorf("failed to add finalizer for PVR %s/%s", pvr.Namespace, pvr.Name) return ctrl.Result{}, err } return ctrl.Result{}, nil } if !pvr.DeletionTimestamp.IsZero() { if !pvr.Spec.Cancel { log.Warnf("Cancel PVR under phase %s because it is being deleted", pvr.Status.Phase) if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool { if pvr.Spec.Cancel { return false } pvr.Spec.Cancel = true pvr.Status.Message = "Cancel PVR because it is being deleted" return true }); err != nil { log.WithError(err).Errorf("failed to set cancel flag for PVR %s/%s", pvr.Namespace, pvr.Name) return ctrl.Result{}, err } return ctrl.Result{}, nil } } } else { delete(r.cancelledPVR, pvr.Name) if controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool { if !controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { return false } controllerutil.RemoveFinalizer(pvr, PodVolumeFinalizer) return true }); err != nil { log.WithError(err).Error("error to remove finalizer") return ctrl.Result{}, err } return ctrl.Result{}, nil } } if pvr.Spec.Cancel { if spotted, found := r.cancelledPVR[pvr.Name]; !found { r.cancelledPVR[pvr.Name] = r.clock.Now() } else { delay := cancelDelayOthers if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { delay = cancelDelayInProgress } if time.Since(spotted) > delay { log.Infof("PVR %s is canceled in Phase %s but not handled in rasonable time", pvr.GetName(), pvr.Status.Phase) if r.tryCancelPodVolumeRestore(ctx, pvr, "") { delete(r.cancelledPVR, pvr.Name) } return ctrl.Result{}, nil } } } if pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew { if pvr.Spec.Cancel { log.Infof("PVR %s is canceled in Phase %s", pvr.GetName(), pvr.Status.Phase) _ = r.tryCancelPodVolumeRestore(ctx, pvr, "") return ctrl.Result{}, nil } shouldProcess, pod, err := shouldProcess(ctx, r.client, log, pvr) if err != nil { return ctrl.Result{}, err } if !shouldProcess { return ctrl.Result{}, nil } log.Info("Accepting PVR") if err := r.acceptPodVolumeRestore(ctx, pvr); err != nil { return ctrl.Result{}, errors.Wrapf(err, "error accepting PVR %s", pvr.Name) } initContainerIndex := getInitContainerIndex(pod) if initContainerIndex > 0 { log.Warnf(`Init containers before the %s container may cause issues if they interfere with volumes being restored: %s index %d`, restorehelper.WaitInitContainer, restorehelper.WaitInitContainer, initContainerIndex) } log.Info("Exposing PVR") exposeParam := r.setupExposeParam(pvr) if err := r.exposer.Expose(ctx, getPVROwnerObject(pvr), exposeParam); err != nil { return r.errorOut(ctx, pvr, err, "error to expose PVR", log) } log.Info("PVR is exposed") return ctrl.Result{}, nil } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted { if peekErr := r.exposer.PeekExposed(ctx, getPVROwnerObject(pvr)); peekErr != nil { log.Errorf("Cancel PVR %s/%s because of expose error %s", pvr.Namespace, pvr.Name, peekErr) _ = r.tryCancelPodVolumeRestore(ctx, pvr, fmt.Sprintf("found a PVR %s/%s with expose error: %s. mark it as cancel", pvr.Namespace, pvr.Name, peekErr)) } else if pvr.Status.AcceptedTimestamp != nil { if time.Since(pvr.Status.AcceptedTimestamp.Time) >= r.preparingTimeout { r.onPrepareTimeout(ctx, pvr) } } return ctrl.Result{}, nil } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared { log.Infof("PVR is prepared and should be processed by %s (%s)", pvr.Status.Node, r.nodeName) if pvr.Status.Node != r.nodeName { return ctrl.Result{}, nil } if pvr.Spec.Cancel { log.Info("Prepared PVR is being canceled") r.OnDataPathCancelled(ctx, pvr.GetNamespace(), pvr.GetName()) return ctrl.Result{}, nil } asyncBR := r.dataPathMgr.GetAsyncBR(pvr.Name) if asyncBR != nil { log.Info("Cancellable data path is already started") return ctrl.Result{}, nil } res, err := r.exposer.GetExposed(ctx, getPVROwnerObject(pvr), r.client, r.nodeName, r.resourceTimeout) if err != nil { return r.errorOut(ctx, pvr, err, "exposed PVR is not ready", log) } else if res == nil { return r.errorOut(ctx, pvr, errors.New("no expose result is available for the current node"), "exposed PVR is not ready", log) } log.Info("Exposed PVR is ready and creating data path routine") callbacks := datapath.Callbacks{ OnCompleted: r.OnDataPathCompleted, OnFailed: r.OnDataPathFailed, OnCancelled: r.OnDataPathCancelled, OnProgress: r.OnDataPathProgress, } asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { log.Info("Data path instance is concurrent limited requeue later") return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } else { return r.errorOut(ctx, pvr, err, "error to create data path", log) } } if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { log.WithError(err).Errorf("Failed to init cancelable data path for %s", pvr.Name) r.closeDataPath(ctx, pvr.Name) return r.errorOut(ctx, pvr, err, "error initializing data path", log) } terminated := false if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if isPVRInFinalState(pvr) { terminated = true return false } pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} return true }); err != nil { log.WithError(err).Warnf("Failed to update PVR %s to InProgress, will data path close and retry", pvr.Name) r.closeDataPath(ctx, pvr.Name) return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } if terminated { log.Warnf("PVR %s is terminated during transition from prepared", pvr.Name) r.closeDataPath(ctx, pvr.Name) return ctrl.Result{}, nil } log.Info("PVR is marked as in progress") if err := r.startCancelableDataPath(asyncBR, pvr, res, log); err != nil { log.WithError(err).Errorf("Failed to start cancelable data path for %s", pvr.Name) r.closeDataPath(ctx, pvr.Name) return r.errorOut(ctx, pvr, err, "error starting data path", log) } return ctrl.Result{}, nil } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { if pvr.Spec.Cancel { if pvr.Status.Node != r.nodeName { return ctrl.Result{}, nil } log.Info("PVR is being canceled") asyncBR := r.dataPathMgr.GetAsyncBR(pvr.Name) if asyncBR == nil { r.OnDataPathCancelled(ctx, pvr.GetNamespace(), pvr.GetName()) return ctrl.Result{}, nil } // Update status to Canceling if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if isPVRInFinalState(pvr) { log.Warnf("PVR %s is terminated, abort setting it to canceling", pvr.Name) return false } pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceling return true }); err != nil { log.WithError(err).Error("error updating PVR into canceling status") return ctrl.Result{}, err } asyncBR.Cancel() return ctrl.Result{}, nil } return ctrl.Result{}, nil } return ctrl.Result{}, nil } func (r *PodVolumeRestoreReconciler) acceptPodVolumeRestore(ctx context.Context, pvr *velerov1api.PodVolumeRestore) error { return UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, r.logger, func(pvr *velerov1api.PodVolumeRestore) bool { pvr.Status.AcceptedTimestamp = &metav1.Time{Time: r.clock.Now()} pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseAccepted pvr.Status.Node = r.nodeName return true }) } func (r *PodVolumeRestoreReconciler) tryCancelPodVolumeRestore(ctx context.Context, pvr *velerov1api.PodVolumeRestore, message string) bool { log := r.logger.WithField("PVR", pvr.Name) succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) { pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceled if pvr.Status.StartTimestamp.IsZero() { pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} } pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} if message != "" { pvr.Status.Message = message } }) if err != nil { log.WithError(err).Error("error updating PVR status") return false } else if !succeeded { log.Warn("conflict in updating PVR status and will try it again later") return false } r.exposer.CleanUp(ctx, getPVROwnerObject(pvr)) log.Warn("PVR is canceled") return true } var funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore func exclusiveUpdatePodVolumeRestore(ctx context.Context, cli client.Client, pvr *velerov1api.PodVolumeRestore, updateFunc func(*velerov1api.PodVolumeRestore)) (bool, error) { updateFunc(pvr) err := cli.Update(ctx, pvr) if err == nil { return true, nil } // warn we won't rollback pvr values in memory when error if apierrors.IsConflict(err) { return false, nil } else { return false, err } } func (r *PodVolumeRestoreReconciler) onPrepareTimeout(ctx context.Context, pvr *velerov1api.PodVolumeRestore) { log := r.logger.WithField("PVR", pvr.Name) log.Info("Timeout happened for preparing PVR") succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) { pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed pvr.Status.Message = "timeout on preparing PVR" }) if err != nil { log.WithError(err).Warn("Failed to update PVR") return } if !succeeded { log.Warn("PVR has been updated by others") return } diags := strings.Split(r.exposer.DiagnoseExpose(ctx, getPVROwnerObject(pvr)), "\n") for _, diag := range diags { log.Warnf("[Diagnose PVR expose]%s", diag) } r.exposer.CleanUp(ctx, getPVROwnerObject(pvr)) log.Info("PVR has been cleaned up") } func (r *PodVolumeRestoreReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { log.Info("Init cancelable PVR") if err := asyncBR.Init(ctx, nil); err != nil { return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async data path init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) return nil } func (r *PodVolumeRestoreReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, pvr *velerov1api.PodVolumeRestore, res *exposer.ExposeResult, log logrus.FieldLogger) error { log.Info("Start cancelable PVR") if err := asyncBR.StartRestore(pvr.Spec.SnapshotID, datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, pvr.Spec.UploaderSettings); err != nil { return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) return nil } func (r *PodVolumeRestoreReconciler) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { r.exposer.CleanUp(ctx, getPVROwnerObject(pvr)) return ctrl.Result{}, UpdatePVRStatusToFailed(ctx, r.client, pvr, err, msg, r.clock.Now(), log) } func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1api.PodVolumeRestore, err error, msg string, time time.Time, log logrus.FieldLogger) error { log.Info("update PVR status to Failed") if patchErr := UpdatePVRWithRetry(context.Background(), c, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if isPVRInFinalState(pvr) { return false } pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed pvr.Status.Message = errors.WithMessage(err, msg).Error() pvr.Status.CompletionTimestamp = &metav1.Time{Time: time} return true }); patchErr != nil { log.WithError(patchErr).Warn("error updating PVR status") } return err } func shouldProcess(ctx context.Context, client client.Client, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) { if !isPVRNew(pvr) { log.Debug("PVR is not new, skip") return false, nil, nil } // we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller // so we don't need to compare the node anymore pod := &corev1api.Pod{} if err := client.Get(ctx, types.NamespacedName{Namespace: pvr.Spec.Pod.Namespace, Name: pvr.Spec.Pod.Name}, pod); err != nil { if apierrors.IsNotFound(err) { log.WithError(err).Debug("Pod not found on this node, skip") return false, nil, nil } log.WithError(err).Error("Unable to get pod") return false, nil, err } if !isInitContainerRunning(pod) { log.Debug("Pod is not running restore-wait init container, skip") return false, nil, nil } return true, pod, nil } func (r *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvrName string) { asyncBR := r.dataPathMgr.GetAsyncBR(pvrName) if asyncBR != nil { asyncBR.Close(ctx) } r.dataPathMgr.RemoveAsyncBR(pvrName) } func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { gp := kube.NewGenericEventPredicate(func(object client.Object) bool { pvr := object.(*velerov1api.PodVolumeRestore) if isLegacyPVR(pvr) { return false } if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted { return true } if pvr.Spec.Cancel && !isPVRInFinalState(pvr) { return true } if isPVRInFinalState(pvr) && !pvr.DeletionTimestamp.IsZero() { return true } return false }) s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerPodVolumeRestore), r.client, &velerov1api.PodVolumeRestoreList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{ Predicates: []predicate.Predicate{gp}, }) pred := kube.NewAllEventPredicate(func(obj client.Object) bool { pvr := obj.(*velerov1api.PodVolumeRestore) return !isLegacyPVR(pvr) }) return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.PodVolumeRestore{}, builder.WithPredicates(pred)). WatchesRawSource(s). Watches(&corev1api.Pod{}, handler.EnqueueRequestsFromMapFunc(r.findPVRForTargetPod)). Watches(&corev1api.Pod{}, kube.EnqueueRequestsFromMapUpdateFunc(r.findPVRForRestorePod), builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(ue event.UpdateEvent) bool { newObj := ue.ObjectNew.(*corev1api.Pod) if _, ok := newObj.Labels[velerov1api.PVRLabel]; !ok { return false } if newObj.Spec.NodeName == "" { return false } return true }, CreateFunc: func(event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return false }, GenericFunc: func(ge event.GenericEvent) bool { return false }, })). Complete(r) } func (r *PodVolumeRestoreReconciler) findPVRForTargetPod(ctx context.Context, pod client.Object) []reconcile.Request { list := &velerov1api.PodVolumeRestoreList{} options := &client.ListOptions{ LabelSelector: labels.Set(map[string]string{ velerov1api.PodUIDLabel: string(pod.GetUID()), }).AsSelector(), } if err := r.client.List(context.TODO(), list, options); err != nil { r.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err). Error("unable to list PodVolumeRestores") return []reconcile.Request{} } requests := []reconcile.Request{} for _, item := range list.Items { if isLegacyPVR(&item) { continue } requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: item.GetNamespace(), Name: item.GetName(), }, }) } return requests } func (r *PodVolumeRestoreReconciler) findPVRForRestorePod(ctx context.Context, podObj client.Object) []reconcile.Request { pod := podObj.(*corev1api.Pod) pvr, err := findPVRByRestorePod(r.client, *pod) log := r.logger.WithField("pod", pod.Name) if err != nil { log.WithError(err).Error("unable to get PVR") return []reconcile.Request{} } else if pvr == nil { log.Error("get empty PVR") return []reconcile.Request{} } log = log.WithFields(logrus.Fields{ "PVR": pvr.Name, }) if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseAccepted { return []reconcile.Request{} } if pod.Status.Phase == corev1api.PodRunning { log.Info("Preparing PVR") if err = UpdatePVRWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if isPVRInFinalState(pvr) { log.Warnf("PVR %s is terminated, abort setting it to prepared", pvr.Name) return false } pvr.Status.Phase = velerov1api.PodVolumeRestorePhasePrepared return true }); err != nil { log.WithError(err).Warn("failed to update PVR, prepare will halt for this PVR") return []reconcile.Request{} } } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { err := UpdatePVRWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if pvr.Spec.Cancel { return false } pvr.Spec.Cancel = true pvr.Status.Message = fmt.Sprintf("Cancel PVR because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason) return true }) if err != nil { log.WithError(err).Warn("failed to cancel PVR, and it will wait for prepare timeout") return []reconcile.Request{} } log.Infof("Exposed pod is in abnormal status(reason %s) and PVR is marked as cancel", reason) } else { return []reconcile.Request{} } request := reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: pvr.Namespace, Name: pvr.Name, }, } return []reconcile.Request{request} } func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool { return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew } func isInitContainerRunning(pod *corev1api.Pod) bool { // Pod volume wait container can be anywhere in the list of init containers, but must be running. i := getInitContainerIndex(pod) return i >= 0 && len(pod.Status.InitContainerStatuses)-1 >= i && pod.Status.InitContainerStatuses[i].State.Running != nil } func getInitContainerIndex(pod *corev1api.Pod) int { // Pod volume wait container can be anywhere in the list of init containers so locate it. for i, initContainer := range pod.Spec.InitContainers { if initContainer.Name == restorehelper.WaitInitContainer { return i } } return -1 } func (r *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { defer r.dataPathMgr.RemoveAsyncBR(pvrName) log := r.logger.WithField("PVR", pvrName) log.WithField("PVR", pvrName).WithField("result", result.Restore).Info("Async fs restore data path completed") var pvr velerov1api.PodVolumeRestore if err := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { log.WithError(err).Warn("Failed to get PVR on completion") return } log.Info("Cleaning up exposed environment") r.exposer.CleanUp(ctx, getPVROwnerObject(&pvr)) if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if isPVRInFinalState(pvr) { return false } pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} return true }); err != nil { log.WithError(err).Error("error updating PVR status") } else { log.Info("Restore completed") } } func (r *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { defer r.dataPathMgr.RemoveAsyncBR(pvrName) log := r.logger.WithField("PVR", pvrName) log.WithError(err).Error("Async fs restore data path failed") var pvr velerov1api.PodVolumeRestore if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { log.WithError(getErr).Warn("Failed to get PVR on failure") } else { _, _ = r.errorOut(ctx, &pvr, err, "data path restore failed", log) } } func (r *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { defer r.dataPathMgr.RemoveAsyncBR(pvrName) log := r.logger.WithField("PVR", pvrName) log.Warn("Async fs restore data path canceled") var pvr velerov1api.PodVolumeRestore if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { log.WithError(getErr).Warn("Failed to get PVR on cancel") return } // cleans up any objects generated during the snapshot expose r.exposer.CleanUp(ctx, getPVROwnerObject(&pvr)) if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { if isPVRInFinalState(pvr) { return false } pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceled if pvr.Status.StartTimestamp.IsZero() { pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} } pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} return true }); err != nil { log.WithError(err).Error("error updating PVR status on cancel") } else { delete(r.cancelledPVR, pvr.Name) } } func (r *PodVolumeRestoreReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) { log := r.logger.WithField("PVR", pvrName) if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: pvrName}, log, func(pvr *velerov1api.PodVolumeRestore) bool { pvr.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} return true }); err != nil { log.WithError(err).Error("Failed to update progress") } } func (r *PodVolumeRestoreReconciler) setupExposeParam(pvr *velerov1api.PodVolumeRestore) exposer.PodVolumeExposeParam { log := r.logger.WithField("PVR", pvr.Name) hostingPodLabels := map[string]string{velerov1api.PVRLabel: pvr.Name} for _, k := range util.ThirdPartyLabels { if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, pvr.Namespace, k, ""); err != nil { if err != nodeagent.ErrNodeAgentLabelNotFound { log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k) } } else { hostingPodLabels[k] = v } } hostingPodAnnotation := map[string]string{} for _, k := range util.ThirdPartyAnnotations { if v, err := nodeagent.GetAnnotationValue(context.Background(), r.kubeClient, pvr.Namespace, k, ""); err != nil { if err != nodeagent.ErrNodeAgentAnnotationNotFound { log.WithError(err).Warnf("Failed to check node-agent annotation, skip adding host pod annotation %s", k) } } else { hostingPodAnnotation[k] = v } } return exposer.PodVolumeExposeParam{ Type: exposer.PodVolumeExposeTypeRestore, ClientNamespace: pvr.Spec.Pod.Namespace, ClientPodName: pvr.Spec.Pod.Name, ClientPodVolume: pvr.Spec.Volume, HostingPodLabels: hostingPodLabels, HostingPodAnnotations: hostingPodAnnotation, OperationTimeout: r.resourceTimeout, Resources: r.podResources, } } func getPVROwnerObject(pvr *velerov1api.PodVolumeRestore) corev1api.ObjectReference { return corev1api.ObjectReference{ Kind: pvr.Kind, Namespace: pvr.Namespace, Name: pvr.Name, UID: pvr.UID, APIVersion: pvr.APIVersion, } } func findPVRByRestorePod(client client.Client, pod corev1api.Pod) (*velerov1api.PodVolumeRestore, error) { if label, exist := pod.Labels[velerov1api.PVRLabel]; exist { pvr := &velerov1api.PodVolumeRestore{} err := client.Get(context.Background(), types.NamespacedName{ Namespace: pod.Namespace, Name: label, }, pvr) if err != nil { return nil, errors.Wrapf(err, "error to find PVR by pod %s/%s", pod.Namespace, pod.Name) } return pvr, nil } return nil, nil } func isPVRInFinalState(pvr *velerov1api.PodVolumeRestore) bool { return pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted } func UpdatePVRWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov1api.PodVolumeRestore) bool) error { return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { pvr := &velerov1api.PodVolumeRestore{} if err := client.Get(ctx, namespacedName, pvr); err != nil { return false, errors.Wrap(err, "getting PVR") } if updateFunc(pvr) { err := client.Update(ctx, pvr) if err != nil { if apierrors.IsConflict(err) { log.Warnf("failed to update PVR for %s/%s and will retry it", pvr.Namespace, pvr.Name) return false, nil } else { return false, errors.Wrapf(err, "error updating PVR %s/%s", pvr.Namespace, pvr.Name) } } } return true, nil }) }