velero/pkg/controller/pod_volume_restore_controll...

911 lines
30 KiB
Go

/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/restorehelper"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements,
logger logrus.FieldLogger) *PodVolumeRestoreReconciler {
return &PodVolumeRestoreReconciler{
client: client,
mgr: mgr,
kubeClient: kubeClient,
logger: logger.WithField("controller", "PodVolumeRestore"),
nodeName: nodeName,
clock: &clocks.RealClock{},
podResources: podResources,
dataPathMgr: dataPathMgr,
preparingTimeout: preparingTimeout,
resourceTimeout: resourceTimeout,
exposer: exposer.NewPodVolumeExposer(kubeClient, logger),
cancelledPVR: make(map[string]time.Time),
}
}
type PodVolumeRestoreReconciler struct {
client client.Client
mgr manager.Manager
kubeClient kubernetes.Interface
logger logrus.FieldLogger
nodeName string
clock clocks.WithTickerAndDelayedExecution
podResources corev1api.ResourceRequirements
exposer exposer.PodVolumeExposer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
resourceTimeout time.Duration
cancelledPVR map[string]time.Time
}
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get
// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get
func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.logger.WithField("PodVolumeRestore", req.NamespacedName.String())
log.Info("Reconciling PVR by advanced controller")
pvr := &velerov1api.PodVolumeRestore{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil {
if apierrors.IsNotFound(err) {
log.Warn("PVR not found, skip")
return ctrl.Result{}, nil
}
log.WithError(err).Error("Unable to get the PVR")
return ctrl.Result{}, err
}
log = log.WithField("pod", fmt.Sprintf("%s/%s", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name))
if len(pvr.OwnerReferences) == 1 {
log = log.WithField("restore", fmt.Sprintf("%s/%s", pvr.Namespace, pvr.OwnerReferences[0].Name))
}
// Logic for clear resources when pvr been deleted
if !isPVRInFinalState(pvr) {
if !controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) {
if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) {
return false
}
controllerutil.AddFinalizer(pvr, PodVolumeFinalizer)
return true
}); err != nil {
log.WithError(err).Errorf("failed to add finalizer for PVR %s/%s", pvr.Namespace, pvr.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if !pvr.DeletionTimestamp.IsZero() {
if !pvr.Spec.Cancel {
log.Warnf("Cancel PVR under phase %s because it is being deleted", pvr.Status.Phase)
if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if pvr.Spec.Cancel {
return false
}
pvr.Spec.Cancel = true
pvr.Status.Message = "Cancel PVR because it is being deleted"
return true
}); err != nil {
log.WithError(err).Errorf("failed to set cancel flag for PVR %s/%s", pvr.Namespace, pvr.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
} else {
delete(r.cancelledPVR, pvr.Name)
if controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) {
if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if !controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) {
return false
}
controllerutil.RemoveFinalizer(pvr, PodVolumeFinalizer)
return true
}); err != nil {
log.WithError(err).Error("error to remove finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
if pvr.Spec.Cancel {
if spotted, found := r.cancelledPVR[pvr.Name]; !found {
r.cancelledPVR[pvr.Name] = r.clock.Now()
} else {
delay := cancelDelayOthers
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
delay = cancelDelayInProgress
}
if time.Since(spotted) > delay {
log.Infof("PVR %s is canceled in Phase %s but not handled in rasonable time", pvr.GetName(), pvr.Status.Phase)
if r.tryCancelPodVolumeRestore(ctx, pvr, "") {
delete(r.cancelledPVR, pvr.Name)
}
return ctrl.Result{}, nil
}
}
}
if pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew {
if pvr.Spec.Cancel {
log.Infof("PVR %s is canceled in Phase %s", pvr.GetName(), pvr.Status.Phase)
_ = r.tryCancelPodVolumeRestore(ctx, pvr, "")
return ctrl.Result{}, nil
}
shouldProcess, pod, err := shouldProcess(ctx, r.client, log, pvr)
if err != nil {
return ctrl.Result{}, err
}
if !shouldProcess {
return ctrl.Result{}, nil
}
log.Info("Accepting PVR")
if err := r.acceptPodVolumeRestore(ctx, pvr); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error accepting PVR %s", pvr.Name)
}
initContainerIndex := getInitContainerIndex(pod)
if initContainerIndex > 0 {
log.Warnf(`Init containers before the %s container may cause issues
if they interfere with volumes being restored: %s index %d`, restorehelper.WaitInitContainer, restorehelper.WaitInitContainer, initContainerIndex)
}
log.Info("Exposing PVR")
exposeParam := r.setupExposeParam(pvr)
if err := r.exposer.Expose(ctx, getPVROwnerObject(pvr), exposeParam); err != nil {
return r.errorOut(ctx, pvr, err, "error to expose PVR", log)
}
log.Info("PVR is exposed")
return ctrl.Result{}, nil
} else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted {
if peekErr := r.exposer.PeekExposed(ctx, getPVROwnerObject(pvr)); peekErr != nil {
log.Errorf("Cancel PVR %s/%s because of expose error %s", pvr.Namespace, pvr.Name, peekErr)
_ = r.tryCancelPodVolumeRestore(ctx, pvr, fmt.Sprintf("found a PVR %s/%s with expose error: %s. mark it as cancel", pvr.Namespace, pvr.Name, peekErr))
} else if pvr.Status.AcceptedTimestamp != nil {
if time.Since(pvr.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, pvr)
}
}
return ctrl.Result{}, nil
} else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared {
log.Infof("PVR is prepared and should be processed by %s (%s)", pvr.Status.Node, r.nodeName)
if pvr.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
if pvr.Spec.Cancel {
log.Info("Prepared PVR is being canceled")
r.OnDataPathCancelled(ctx, pvr.GetNamespace(), pvr.GetName())
return ctrl.Result{}, nil
}
asyncBR := r.dataPathMgr.GetAsyncBR(pvr.Name)
if asyncBR != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
res, err := r.exposer.GetExposed(ctx, getPVROwnerObject(pvr), r.client, r.nodeName, r.resourceTimeout)
if err != nil {
return r.errorOut(ctx, pvr, err, "exposed PVR is not ready", log)
} else if res == nil {
return r.errorOut(ctx, pvr, errors.New("no expose result is available for the current node"), "exposed PVR is not ready", log)
}
log.Info("Exposed PVR is ready and creating data path routine")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataPathCompleted,
OnFailed: r.OnDataPathFailed,
OnCancelled: r.OnDataPathCancelled,
OnProgress: r.OnDataPathProgress,
}
asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore,
pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else {
return r.errorOut(ctx, pvr, err, "error to create data path", log)
}
}
if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil {
log.WithError(err).Errorf("Failed to init cancelable data path for %s", pvr.Name)
r.closeDataPath(ctx, pvr.Name)
return r.errorOut(ctx, pvr, err, "error initializing data path", log)
}
terminated := false
if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if isPVRInFinalState(pvr) {
terminated = true
return false
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress
pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update PVR %s to InProgress, will data path close and retry", pvr.Name)
r.closeDataPath(ctx, pvr.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
if terminated {
log.Warnf("PVR %s is terminated during transition from prepared", pvr.Name)
r.closeDataPath(ctx, pvr.Name)
return ctrl.Result{}, nil
}
log.Info("PVR is marked as in progress")
if err := r.startCancelableDataPath(asyncBR, pvr, res, log); err != nil {
log.WithError(err).Errorf("Failed to start cancelable data path for %s", pvr.Name)
r.closeDataPath(ctx, pvr.Name)
return r.errorOut(ctx, pvr, err, "error starting data path", log)
}
return ctrl.Result{}, nil
} else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
if pvr.Spec.Cancel {
if pvr.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
log.Info("PVR is being canceled")
asyncBR := r.dataPathMgr.GetAsyncBR(pvr.Name)
if asyncBR == nil {
r.OnDataPathCancelled(ctx, pvr.GetNamespace(), pvr.GetName())
return ctrl.Result{}, nil
}
// Update status to Canceling
if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if isPVRInFinalState(pvr) {
log.Warnf("PVR %s is terminated, abort setting it to canceling", pvr.Name)
return false
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceling
return true
}); err != nil {
log.WithError(err).Error("error updating PVR into canceling status")
return ctrl.Result{}, err
}
asyncBR.Cancel()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
func (r *PodVolumeRestoreReconciler) acceptPodVolumeRestore(ctx context.Context, pvr *velerov1api.PodVolumeRestore) error {
return UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, r.logger, func(pvr *velerov1api.PodVolumeRestore) bool {
pvr.Status.AcceptedTimestamp = &metav1.Time{Time: r.clock.Now()}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseAccepted
pvr.Status.Node = r.nodeName
return true
})
}
func (r *PodVolumeRestoreReconciler) tryCancelPodVolumeRestore(ctx context.Context, pvr *velerov1api.PodVolumeRestore, message string) bool {
log := r.logger.WithField("PVR", pvr.Name)
succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) {
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceled
if pvr.Status.StartTimestamp.IsZero() {
pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
}
pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
if message != "" {
pvr.Status.Message = message
}
})
if err != nil {
log.WithError(err).Error("error updating PVR status")
return false
} else if !succeeded {
log.Warn("conflict in updating PVR status and will try it again later")
return false
}
r.exposer.CleanUp(ctx, getPVROwnerObject(pvr))
log.Warn("PVR is canceled")
return true
}
var funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore
func exclusiveUpdatePodVolumeRestore(ctx context.Context, cli client.Client, pvr *velerov1api.PodVolumeRestore,
updateFunc func(*velerov1api.PodVolumeRestore)) (bool, error) {
updateFunc(pvr)
err := cli.Update(ctx, pvr)
if err == nil {
return true, nil
}
// warn we won't rollback pvr values in memory when error
if apierrors.IsConflict(err) {
return false, nil
} else {
return false, err
}
}
func (r *PodVolumeRestoreReconciler) onPrepareTimeout(ctx context.Context, pvr *velerov1api.PodVolumeRestore) {
log := r.logger.WithField("PVR", pvr.Name)
log.Info("Timeout happened for preparing PVR")
succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) {
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = "timeout on preparing PVR"
})
if err != nil {
log.WithError(err).Warn("Failed to update PVR")
return
}
if !succeeded {
log.Warn("PVR has been updated by others")
return
}
diags := strings.Split(r.exposer.DiagnoseExpose(ctx, getPVROwnerObject(pvr)), "\n")
for _, diag := range diags {
log.Warnf("[Diagnose PVR expose]%s", diag)
}
r.exposer.CleanUp(ctx, getPVROwnerObject(pvr))
log.Info("PVR has been cleaned up")
}
func (r *PodVolumeRestoreReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Init cancelable PVR")
if err := asyncBR.Init(ctx, nil); err != nil {
return errors.Wrap(err, "error initializing asyncBR")
}
log.Infof("async data path init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *PodVolumeRestoreReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, pvr *velerov1api.PodVolumeRestore, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Start cancelable PVR")
if err := asyncBR.StartRestore(pvr.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, pvr.Spec.UploaderSettings); err != nil {
return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
}
log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *PodVolumeRestoreReconciler) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
r.exposer.CleanUp(ctx, getPVROwnerObject(pvr))
return ctrl.Result{}, UpdatePVRStatusToFailed(ctx, r.client, pvr, err, msg, r.clock.Now(), log)
}
func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1api.PodVolumeRestore, err error, msg string, time time.Time, log logrus.FieldLogger) error {
log.Info("update PVR status to Failed")
if patchErr := UpdatePVRWithRetry(context.Background(), c, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log,
func(pvr *velerov1api.PodVolumeRestore) bool {
if isPVRInFinalState(pvr) {
return false
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = errors.WithMessage(err, msg).Error()
pvr.Status.CompletionTimestamp = &metav1.Time{Time: time}
return true
}); patchErr != nil {
log.WithError(patchErr).Warn("error updating PVR status")
}
return err
}
func shouldProcess(ctx context.Context, client client.Client, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) {
if !isPVRNew(pvr) {
log.Debug("PVR is not new, skip")
return false, nil, nil
}
// we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller
// so we don't need to compare the node anymore
pod := &corev1api.Pod{}
if err := client.Get(ctx, types.NamespacedName{Namespace: pvr.Spec.Pod.Namespace, Name: pvr.Spec.Pod.Name}, pod); err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debug("Pod not found on this node, skip")
return false, nil, nil
}
log.WithError(err).Error("Unable to get pod")
return false, nil, err
}
if !isInitContainerRunning(pod) {
log.Debug("Pod is not running restore-wait init container, skip")
return false, nil, nil
}
return true, pod, nil
}
func (r *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvrName string) {
asyncBR := r.dataPathMgr.GetAsyncBR(pvrName)
if asyncBR != nil {
asyncBR.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(pvrName)
}
func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
pvr := object.(*velerov1api.PodVolumeRestore)
if isLegacyPVR(pvr) {
return false
}
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted {
return true
}
if pvr.Spec.Cancel && !isPVRInFinalState(pvr) {
return true
}
if isPVRInFinalState(pvr) && !pvr.DeletionTimestamp.IsZero() {
return true
}
return false
})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerPodVolumeRestore), r.client, &velerov1api.PodVolumeRestoreList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{
Predicates: []predicate.Predicate{gp},
})
pred := kube.NewAllEventPredicate(func(obj client.Object) bool {
pvr := obj.(*velerov1api.PodVolumeRestore)
return !isLegacyPVR(pvr)
})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.PodVolumeRestore{}, builder.WithPredicates(pred)).
WatchesRawSource(s).
Watches(&corev1api.Pod{}, handler.EnqueueRequestsFromMapFunc(r.findPVRForTargetPod)).
Watches(&corev1api.Pod{}, kube.EnqueueRequestsFromMapUpdateFunc(r.findPVRForRestorePod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
newObj := ue.ObjectNew.(*corev1api.Pod)
if _, ok := newObj.Labels[velerov1api.PVRLabel]; !ok {
return false
}
if newObj.Spec.NodeName == "" {
return false
}
return true
},
CreateFunc: func(event.CreateEvent) bool {
return false
},
DeleteFunc: func(de event.DeleteEvent) bool {
return false
},
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
})).
Complete(r)
}
func (r *PodVolumeRestoreReconciler) findPVRForTargetPod(ctx context.Context, pod client.Object) []reconcile.Request {
list := &velerov1api.PodVolumeRestoreList{}
options := &client.ListOptions{
LabelSelector: labels.Set(map[string]string{
velerov1api.PodUIDLabel: string(pod.GetUID()),
}).AsSelector(),
}
if err := r.client.List(context.TODO(), list, options); err != nil {
r.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err).
Error("unable to list PodVolumeRestores")
return []reconcile.Request{}
}
requests := []reconcile.Request{}
for _, item := range list.Items {
if isLegacyPVR(&item) {
continue
}
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: item.GetNamespace(),
Name: item.GetName(),
},
})
}
return requests
}
func (r *PodVolumeRestoreReconciler) findPVRForRestorePod(ctx context.Context, podObj client.Object) []reconcile.Request {
pod := podObj.(*corev1api.Pod)
pvr, err := findPVRByRestorePod(r.client, *pod)
log := r.logger.WithField("pod", pod.Name)
if err != nil {
log.WithError(err).Error("unable to get PVR")
return []reconcile.Request{}
} else if pvr == nil {
log.Error("get empty PVR")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"PVR": pvr.Name,
})
if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseAccepted {
return []reconcile.Request{}
}
if pod.Status.Phase == corev1api.PodRunning {
log.Info("Preparing PVR")
if err = UpdatePVRWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log,
func(pvr *velerov1api.PodVolumeRestore) bool {
if isPVRInFinalState(pvr) {
log.Warnf("PVR %s is terminated, abort setting it to prepared", pvr.Name)
return false
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhasePrepared
return true
}); err != nil {
log.WithError(err).Warn("failed to update PVR, prepare will halt for this PVR")
return []reconcile.Request{}
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
err := UpdatePVRWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log,
func(pvr *velerov1api.PodVolumeRestore) bool {
if pvr.Spec.Cancel {
return false
}
pvr.Spec.Cancel = true
pvr.Status.Message = fmt.Sprintf("Cancel PVR because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
return true
})
if err != nil {
log.WithError(err).Warn("failed to cancel PVR, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Infof("Exposed pod is in abnormal status(reason %s) and PVR is marked as cancel", reason)
} else {
return []reconcile.Request{}
}
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: pvr.Namespace,
Name: pvr.Name,
},
}
return []reconcile.Request{request}
}
func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew
}
func isInitContainerRunning(pod *corev1api.Pod) bool {
// Pod volume wait container can be anywhere in the list of init containers, but must be running.
i := getInitContainerIndex(pod)
return i >= 0 &&
len(pod.Status.InitContainerStatuses)-1 >= i &&
pod.Status.InitContainerStatuses[i].State.Running != nil
}
func getInitContainerIndex(pod *corev1api.Pod) int {
// Pod volume wait container can be anywhere in the list of init containers so locate it.
for i, initContainer := range pod.Spec.InitContainers {
if initContainer.Name == restorehelper.WaitInitContainer {
return i
}
}
return -1
}
func (r *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
defer r.dataPathMgr.RemoveAsyncBR(pvrName)
log := r.logger.WithField("PVR", pvrName)
log.WithField("PVR", pvrName).WithField("result", result.Restore).Info("Async fs restore data path completed")
var pvr velerov1api.PodVolumeRestore
if err := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil {
log.WithError(err).Warn("Failed to get PVR on completion")
return
}
log.Info("Cleaning up exposed environment")
r.exposer.CleanUp(ctx, getPVROwnerObject(&pvr))
if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if isPVRInFinalState(pvr) {
return false
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted
pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
return true
}); err != nil {
log.WithError(err).Error("error updating PVR status")
} else {
log.Info("Restore completed")
}
}
func (r *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) {
defer r.dataPathMgr.RemoveAsyncBR(pvrName)
log := r.logger.WithField("PVR", pvrName)
log.WithError(err).Error("Async fs restore data path failed")
var pvr velerov1api.PodVolumeRestore
if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVR on failure")
} else {
_, _ = r.errorOut(ctx, &pvr, err, "data path restore failed", log)
}
}
func (r *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) {
defer r.dataPathMgr.RemoveAsyncBR(pvrName)
log := r.logger.WithField("PVR", pvrName)
log.Warn("Async fs restore data path canceled")
var pvr velerov1api.PodVolumeRestore
if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVR on cancel")
return
}
// cleans up any objects generated during the snapshot expose
r.exposer.CleanUp(ctx, getPVROwnerObject(&pvr))
if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool {
if isPVRInFinalState(pvr) {
return false
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceled
if pvr.Status.StartTimestamp.IsZero() {
pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
}
pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
return true
}); err != nil {
log.WithError(err).Error("error updating PVR status on cancel")
} else {
delete(r.cancelledPVR, pvr.Name)
}
}
func (r *PodVolumeRestoreReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) {
log := r.logger.WithField("PVR", pvrName)
if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: pvrName}, log, func(pvr *velerov1api.PodVolumeRestore) bool {
pvr.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
return true
}); err != nil {
log.WithError(err).Error("Failed to update progress")
}
}
func (r *PodVolumeRestoreReconciler) setupExposeParam(pvr *velerov1api.PodVolumeRestore) exposer.PodVolumeExposeParam {
log := r.logger.WithField("PVR", pvr.Name)
hostingPodLabels := map[string]string{velerov1api.PVRLabel: pvr.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, pvr.Namespace, k, ""); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
}
}
hostingPodAnnotation := map[string]string{}
for _, k := range util.ThirdPartyAnnotations {
if v, err := nodeagent.GetAnnotationValue(context.Background(), r.kubeClient, pvr.Namespace, k, ""); err != nil {
if err != nodeagent.ErrNodeAgentAnnotationNotFound {
log.WithError(err).Warnf("Failed to check node-agent annotation, skip adding host pod annotation %s", k)
}
} else {
hostingPodAnnotation[k] = v
}
}
return exposer.PodVolumeExposeParam{
Type: exposer.PodVolumeExposeTypeRestore,
ClientNamespace: pvr.Spec.Pod.Namespace,
ClientPodName: pvr.Spec.Pod.Name,
ClientPodVolume: pvr.Spec.Volume,
HostingPodLabels: hostingPodLabels,
HostingPodAnnotations: hostingPodAnnotation,
OperationTimeout: r.resourceTimeout,
Resources: r.podResources,
}
}
func getPVROwnerObject(pvr *velerov1api.PodVolumeRestore) corev1api.ObjectReference {
return corev1api.ObjectReference{
Kind: pvr.Kind,
Namespace: pvr.Namespace,
Name: pvr.Name,
UID: pvr.UID,
APIVersion: pvr.APIVersion,
}
}
func findPVRByRestorePod(client client.Client, pod corev1api.Pod) (*velerov1api.PodVolumeRestore, error) {
if label, exist := pod.Labels[velerov1api.PVRLabel]; exist {
pvr := &velerov1api.PodVolumeRestore{}
err := client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: label,
}, pvr)
if err != nil {
return nil, errors.Wrapf(err, "error to find PVR by pod %s/%s", pod.Namespace, pod.Name)
}
return pvr, nil
}
return nil, nil
}
func isPVRInFinalState(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed ||
pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled ||
pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted
}
func UpdatePVRWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov1api.PodVolumeRestore) bool) error {
return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) {
pvr := &velerov1api.PodVolumeRestore{}
if err := client.Get(ctx, namespacedName, pvr); err != nil {
return false, errors.Wrap(err, "getting PVR")
}
if updateFunc(pvr) {
err := client.Update(ctx, pvr)
if err != nil {
if apierrors.IsConflict(err) {
log.Warnf("failed to update PVR for %s/%s and will retry it", pvr.Namespace, pvr.Name)
return false, nil
} else {
return false, errors.Wrapf(err, "error updating PVR %s/%s", pvr.Namespace, pvr.Name)
}
}
}
return true, nil
})
}