1108 lines
40 KiB
Go
1108 lines
40 KiB
Go
/*
|
|
Copyright The Velero Contributors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
corev1api "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes"
|
|
clocks "k8s.io/utils/clock"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
|
|
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
|
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
|
"github.com/vmware-tanzu/velero/pkg/constant"
|
|
"github.com/vmware-tanzu/velero/pkg/datamover"
|
|
"github.com/vmware-tanzu/velero/pkg/datapath"
|
|
"github.com/vmware-tanzu/velero/pkg/exposer"
|
|
"github.com/vmware-tanzu/velero/pkg/metrics"
|
|
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
|
"github.com/vmware-tanzu/velero/pkg/uploader"
|
|
"github.com/vmware-tanzu/velero/pkg/util"
|
|
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
|
)
|
|
|
|
const (
|
|
dataUploadDownloadRequestor = "snapshot-data-upload-download"
|
|
DataUploadDownloadFinalizer = "velero.io/data-upload-download-finalizer"
|
|
preparingMonitorFrequency = time.Minute
|
|
cancelDelayInProgress = time.Hour
|
|
cancelDelayOthers = time.Minute * 5
|
|
)
|
|
|
|
// DataUploadReconciler reconciles a DataUpload object
|
|
type DataUploadReconciler struct {
|
|
client client.Client
|
|
kubeClient kubernetes.Interface
|
|
csiSnapshotClient snapshotter.SnapshotV1Interface
|
|
mgr manager.Manager
|
|
Clock clocks.WithTickerAndDelayedExecution
|
|
nodeName string
|
|
logger logrus.FieldLogger
|
|
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
|
|
dataPathMgr *datapath.Manager
|
|
loadAffinity *kube.LoadAffinity
|
|
backupPVCConfig map[string]nodeagent.BackupPVC
|
|
podResources corev1api.ResourceRequirements
|
|
preparingTimeout time.Duration
|
|
metrics *metrics.ServerMetrics
|
|
cancelledDataUpload map[string]time.Time
|
|
}
|
|
|
|
func NewDataUploadReconciler(
|
|
client client.Client,
|
|
mgr manager.Manager,
|
|
kubeClient kubernetes.Interface,
|
|
csiSnapshotClient snapshotter.SnapshotV1Interface,
|
|
dataPathMgr *datapath.Manager,
|
|
loadAffinity *kube.LoadAffinity,
|
|
backupPVCConfig map[string]nodeagent.BackupPVC,
|
|
podResources corev1api.ResourceRequirements,
|
|
clock clocks.WithTickerAndDelayedExecution,
|
|
nodeName string,
|
|
preparingTimeout time.Duration,
|
|
log logrus.FieldLogger,
|
|
metrics *metrics.ServerMetrics,
|
|
) *DataUploadReconciler {
|
|
return &DataUploadReconciler{
|
|
client: client,
|
|
mgr: mgr,
|
|
kubeClient: kubeClient,
|
|
csiSnapshotClient: csiSnapshotClient,
|
|
Clock: clock,
|
|
nodeName: nodeName,
|
|
logger: log,
|
|
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{
|
|
velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(
|
|
kubeClient,
|
|
csiSnapshotClient,
|
|
log,
|
|
),
|
|
},
|
|
dataPathMgr: dataPathMgr,
|
|
loadAffinity: loadAffinity,
|
|
backupPVCConfig: backupPVCConfig,
|
|
podResources: podResources,
|
|
preparingTimeout: preparingTimeout,
|
|
metrics: metrics,
|
|
cancelledDataUpload: make(map[string]time.Time),
|
|
}
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups=velero.io,resources=datauploads,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=velero.io,resources=datauploads/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
|
|
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get
|
|
// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get
|
|
|
|
func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
log := r.logger.WithFields(logrus.Fields{
|
|
"controller": "dataupload",
|
|
"dataupload": req.NamespacedName,
|
|
})
|
|
log.Infof("Reconcile %s", req.Name)
|
|
du := &velerov2alpha1api.DataUpload{}
|
|
if err := r.client.Get(ctx, req.NamespacedName, du); err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
log.Debug("Unable to find DataUpload")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
|
|
}
|
|
|
|
if !datamover.IsBuiltInUploader(du.Spec.DataMover) {
|
|
log.WithField("Data mover", du.Spec.DataMover).Debug("it is not one built-in data mover which is not supported by Velero")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Logic for clear resources when dataupload been deleted
|
|
if !isDataUploadInFinalState(du) {
|
|
if !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
|
if controllerutil.ContainsFinalizer(dataUpload, DataUploadDownloadFinalizer) {
|
|
return false
|
|
}
|
|
|
|
controllerutil.AddFinalizer(dataUpload, DataUploadDownloadFinalizer)
|
|
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Errorf("failed to add finalizer for du %s/%s", du.Namespace, du.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
if !du.DeletionTimestamp.IsZero() {
|
|
if !du.Spec.Cancel {
|
|
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
|
|
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
|
|
log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase)
|
|
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
|
if dataUpload.Spec.Cancel {
|
|
return false
|
|
}
|
|
|
|
dataUpload.Spec.Cancel = true
|
|
dataUpload.Status.Message = "Cancel dataupload because it is being deleted"
|
|
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Errorf("failed to set cancel flag for du %s/%s", du.Namespace, du.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
}
|
|
} else {
|
|
delete(r.cancelledDataUpload, du.Name)
|
|
|
|
// put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
|
|
// instead of intermediate state.
|
|
// remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
|
|
// also in final status cr won't block the direct delete of the velero namespace
|
|
if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
if !controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) {
|
|
return false
|
|
}
|
|
|
|
controllerutil.RemoveFinalizer(du, DataUploadDownloadFinalizer)
|
|
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Error("error to remove finalizer")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
}
|
|
|
|
if du.Spec.Cancel {
|
|
if spotted, found := r.cancelledDataUpload[du.Name]; !found {
|
|
r.cancelledDataUpload[du.Name] = r.Clock.Now()
|
|
} else {
|
|
delay := cancelDelayOthers
|
|
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
|
delay = cancelDelayInProgress
|
|
}
|
|
|
|
if time.Since(spotted) > delay {
|
|
log.Infof("Data upload %s is canceled in Phase %s but not handled in reasonable time", du.GetName(), du.Status.Phase)
|
|
if r.tryCancelDataUpload(ctx, du, "") {
|
|
delete(r.cancelledDataUpload, du.Name)
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
|
|
if !ok {
|
|
return r.errorOut(ctx, du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log)
|
|
}
|
|
|
|
if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew {
|
|
log.Info("Data upload starting")
|
|
|
|
accepted, err := r.acceptDataUpload(ctx, du)
|
|
if err != nil {
|
|
return ctrl.Result{}, errors.Wrapf(err, "error accepting the data upload %s", du.Name)
|
|
}
|
|
|
|
if !accepted {
|
|
log.Debug("Data upload is not accepted")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
log.Info("Data upload is accepted")
|
|
|
|
if du.Spec.Cancel {
|
|
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
exposeParam, err := r.setupExposeParam(du)
|
|
if err != nil {
|
|
return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
|
|
}
|
|
|
|
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
|
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
|
|
// And then only the controller who is in the same node could do the rest work.
|
|
if err := ep.Expose(ctx, getOwnerObject(du), exposeParam); err != nil {
|
|
return r.errorOut(ctx, du, err, "error exposing snapshot", log)
|
|
}
|
|
|
|
log.Info("Snapshot is exposed")
|
|
|
|
return ctrl.Result{}, nil
|
|
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
|
|
if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil {
|
|
r.tryCancelDataUpload(ctx, du, fmt.Sprintf("found a du %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr))
|
|
log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr)
|
|
} else if du.Status.AcceptedTimestamp != nil {
|
|
if time.Since(du.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
|
|
r.onPrepareTimeout(ctx, du)
|
|
}
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
|
|
log.Infof("Data upload is prepared and should be processed by %s (%s)", du.Status.Node, r.nodeName)
|
|
|
|
if du.Status.Node != r.nodeName {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
if du.Spec.Cancel {
|
|
log.Info("Prepared data upload is being canceled")
|
|
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
|
|
if asyncBR != nil {
|
|
log.Info("Cancellable data path is already started")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
waitExposePara := r.setupWaitExposePara(du)
|
|
res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara)
|
|
if err != nil {
|
|
return r.errorOut(ctx, du, err, "exposed snapshot is not ready", log)
|
|
} else if res == nil {
|
|
return r.errorOut(ctx, du, errors.New("no expose result is available for the current node"), "exposed snapshot is not ready", log)
|
|
}
|
|
|
|
if res.ByPod.NodeOS == nil {
|
|
return r.errorOut(ctx, du, errors.New("unsupported ambiguous node OS"), "invalid expose result", log)
|
|
}
|
|
|
|
log.Info("Exposed snapshot is ready and creating data path routine")
|
|
|
|
// Need to first create file system BR and get data path instance then update data upload status
|
|
callbacks := datapath.Callbacks{
|
|
OnCompleted: r.OnDataUploadCompleted,
|
|
OnFailed: r.OnDataUploadFailed,
|
|
OnCancelled: r.OnDataUploadCancelled,
|
|
OnProgress: r.OnDataUploadProgress,
|
|
}
|
|
|
|
asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup,
|
|
du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, false, log)
|
|
if err != nil {
|
|
if err == datapath.ConcurrentLimitExceed {
|
|
log.Info("Data path instance is concurrent limited requeue later")
|
|
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
|
|
} else {
|
|
return r.errorOut(ctx, du, err, "error to create data path", log)
|
|
}
|
|
}
|
|
|
|
if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil {
|
|
log.WithError(err).Errorf("Failed to init cancelable data path for %s", du.Name)
|
|
|
|
r.closeDataPath(ctx, du.Name)
|
|
return r.errorOut(ctx, du, err, "error initializing data path", log)
|
|
}
|
|
|
|
// Update status to InProgress
|
|
terminated := false
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
if isDataUploadInFinalState(du) {
|
|
terminated = true
|
|
return false
|
|
}
|
|
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
|
|
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS)
|
|
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name)
|
|
|
|
r.closeDataPath(ctx, du.Name)
|
|
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
|
|
}
|
|
|
|
if terminated {
|
|
log.Warnf("dataupload %s is terminated during transition from prepared", du.Name)
|
|
r.closeDataPath(ctx, du.Name)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
log.Info("Data upload is marked as in progress")
|
|
|
|
if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil {
|
|
log.WithError(err).Errorf("Failed to start cancelable data path for %s", du.Name)
|
|
r.closeDataPath(ctx, du.Name)
|
|
|
|
return r.errorOut(ctx, du, err, "error starting data path", log)
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
|
if du.Spec.Cancel {
|
|
if du.Status.Node != r.nodeName {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
log.Info("In progress data upload is being canceled")
|
|
|
|
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
|
|
if asyncBR == nil {
|
|
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Update status to Canceling
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
if isDataUploadInFinalState(du) {
|
|
log.Warnf("dataupload %s is terminated, abort setting it to canceling", du.Name)
|
|
return false
|
|
}
|
|
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Error("error updating data upload into canceling status")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
asyncBR.Cancel()
|
|
return ctrl.Result{}, nil
|
|
}
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
|
|
log.Info("Init cancelable dataUpload")
|
|
|
|
if err := asyncBR.Init(ctx, nil); err != nil {
|
|
return errors.Wrap(err, "error initializing asyncBR")
|
|
}
|
|
|
|
log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) error {
|
|
log.Info("Start cancelable dataUpload")
|
|
|
|
if err := asyncBR.StartBackup(datapath.AccessPoint{
|
|
ByPath: res.ByPod.VolumeName,
|
|
}, du.Spec.DataMoverConfig, nil); err != nil {
|
|
return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
|
|
}
|
|
|
|
log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
|
|
return nil
|
|
}
|
|
|
|
func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) {
|
|
defer r.dataPathMgr.RemoveAsyncBR(duName)
|
|
|
|
log := r.logger.WithField("dataupload", duName)
|
|
|
|
log.Info("Async fs backup data path completed")
|
|
|
|
var du velerov2alpha1api.DataUpload
|
|
if err := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); err != nil {
|
|
log.WithError(err).Warn("Failed to get dataupload on completion")
|
|
return
|
|
}
|
|
|
|
// cleans up any objects generated during the snapshot expose
|
|
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
|
|
if !ok {
|
|
log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)).
|
|
Warn("Failed to clean up resources on completion")
|
|
} else {
|
|
var volumeSnapshotName string
|
|
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition
|
|
volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot
|
|
}
|
|
ep.CleanUp(ctx, getOwnerObject(&du), volumeSnapshotName, du.Spec.SourceNamespace)
|
|
}
|
|
|
|
// Update status to Completed with path & snapshot ID.
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
if isDataUploadInFinalState(du) {
|
|
return false
|
|
}
|
|
|
|
du.Status.Path = result.Backup.Source.ByPath
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
|
|
du.Status.SnapshotID = result.Backup.SnapshotID
|
|
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
if result.Backup.EmptySnapshot {
|
|
du.Status.Message = "volume was empty so no data was upload"
|
|
}
|
|
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Error("error updating DataUpload status")
|
|
} else {
|
|
log.Info("Data upload completed")
|
|
r.metrics.RegisterDataUploadSuccess(r.nodeName)
|
|
}
|
|
}
|
|
|
|
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) {
|
|
defer r.dataPathMgr.RemoveAsyncBR(duName)
|
|
|
|
log := r.logger.WithField("dataupload", duName)
|
|
|
|
log.WithError(err).Error("Async fs backup data path failed")
|
|
|
|
var du velerov2alpha1api.DataUpload
|
|
if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil {
|
|
log.WithError(getErr).Warn("Failed to get dataupload on failure")
|
|
} else {
|
|
_, _ = r.errorOut(ctx, &du, err, "data path backup failed", log)
|
|
}
|
|
}
|
|
|
|
func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) {
|
|
defer r.dataPathMgr.RemoveAsyncBR(duName)
|
|
|
|
log := r.logger.WithField("dataupload", duName)
|
|
|
|
log.Warn("Async fs backup data path canceled")
|
|
|
|
du := &velerov2alpha1api.DataUpload{}
|
|
if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, du); getErr != nil {
|
|
log.WithError(getErr).Warn("Failed to get dataupload on cancel")
|
|
return
|
|
}
|
|
// cleans up any objects generated during the snapshot expose
|
|
r.cleanUp(ctx, du, log)
|
|
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
if isDataUploadInFinalState(du) {
|
|
return false
|
|
}
|
|
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
|
|
if du.Status.StartTimestamp.IsZero() {
|
|
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
}
|
|
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Error("error updating DataUpload status")
|
|
} else {
|
|
r.metrics.RegisterDataUploadCancel(r.nodeName)
|
|
delete(r.cancelledDataUpload, du.Name)
|
|
}
|
|
}
|
|
|
|
func (r *DataUploadReconciler) tryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) bool {
|
|
log := r.logger.WithField("dataupload", du.Name)
|
|
succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(dataUpload *velerov2alpha1api.DataUpload) {
|
|
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
|
|
if dataUpload.Status.StartTimestamp.IsZero() {
|
|
dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
}
|
|
dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
|
|
if message != "" {
|
|
dataUpload.Status.Message = message
|
|
}
|
|
})
|
|
|
|
if err != nil {
|
|
log.WithError(err).Error("error updating dataupload status")
|
|
return false
|
|
} else if !succeeded {
|
|
log.Warn("conflict in updating dataupload status and will try it again later")
|
|
return false
|
|
}
|
|
|
|
// success update
|
|
r.metrics.RegisterDataUploadCancel(r.nodeName)
|
|
// cleans up any objects generated during the snapshot expose
|
|
r.cleanUp(ctx, du, log)
|
|
|
|
log.Warn("data upload is canceled")
|
|
|
|
return true
|
|
}
|
|
|
|
func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) {
|
|
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
|
|
if !ok {
|
|
log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)).
|
|
Warn("Failed to clean up resources on canceled")
|
|
} else {
|
|
var volumeSnapshotName string
|
|
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition
|
|
volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot
|
|
}
|
|
ep.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace)
|
|
}
|
|
}
|
|
|
|
func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) {
|
|
log := r.logger.WithField("dataupload", duName)
|
|
|
|
if err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: duName}, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
du.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Error("Failed to update progress")
|
|
}
|
|
}
|
|
|
|
// SetupWithManager registers the DataUpload controller.
|
|
// The fresh new DataUpload CR first created will trigger to create one pod (long time, maybe failure or unknown status) by one of the dataupload controllers
|
|
// then the request will get out of the Reconcile queue immediately by not blocking others' CR handling, in order to finish the rest data upload process we need to
|
|
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
|
|
// pod status and also avoid block others CR handling
|
|
func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
|
|
du := object.(*velerov2alpha1api.DataUpload)
|
|
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
|
|
return true
|
|
}
|
|
|
|
if du.Spec.Cancel && !isDataUploadInFinalState(du) {
|
|
return true
|
|
}
|
|
|
|
if isDataUploadInFinalState(du) && !du.DeletionTimestamp.IsZero() {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
})
|
|
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataUpload), r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{
|
|
Predicates: []predicate.Predicate{gp},
|
|
})
|
|
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&velerov2alpha1api.DataUpload{}).
|
|
WatchesRawSource(s).
|
|
Watches(&corev1api.Pod{}, kube.EnqueueRequestsFromMapUpdateFunc(r.findDataUploadForPod),
|
|
builder.WithPredicates(predicate.Funcs{
|
|
UpdateFunc: func(ue event.UpdateEvent) bool {
|
|
newObj := ue.ObjectNew.(*corev1api.Pod)
|
|
|
|
if _, ok := newObj.Labels[velerov1api.DataUploadLabel]; !ok {
|
|
return false
|
|
}
|
|
|
|
if newObj.Spec.NodeName != r.nodeName {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
},
|
|
CreateFunc: func(event.CreateEvent) bool {
|
|
return false
|
|
},
|
|
DeleteFunc: func(de event.DeleteEvent) bool {
|
|
return false
|
|
},
|
|
GenericFunc: func(ge event.GenericEvent) bool {
|
|
return false
|
|
},
|
|
})).
|
|
Complete(r)
|
|
}
|
|
|
|
func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj client.Object) []reconcile.Request {
|
|
pod := podObj.(*corev1api.Pod)
|
|
du, err := findDataUploadByPod(r.client, *pod)
|
|
log := r.logger.WithFields(logrus.Fields{
|
|
"Backup pod": pod.Name,
|
|
})
|
|
|
|
if err != nil {
|
|
log.WithError(err).Error("unable to get dataupload")
|
|
return []reconcile.Request{}
|
|
} else if du == nil {
|
|
log.Error("get empty DataUpload")
|
|
return []reconcile.Request{}
|
|
}
|
|
log = log.WithFields(logrus.Fields{
|
|
"Datadupload": du.Name,
|
|
})
|
|
|
|
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
|
|
return []reconcile.Request{}
|
|
}
|
|
|
|
if pod.Status.Phase == corev1api.PodRunning {
|
|
log.Info("Preparing dataupload")
|
|
if err = UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log,
|
|
func(du *velerov2alpha1api.DataUpload) bool {
|
|
if isDataUploadInFinalState(du) {
|
|
log.Warnf("dataupload %s is terminated, abort setting it to prepared", du.Name)
|
|
return false
|
|
}
|
|
|
|
r.prepareDataUpload(du)
|
|
return true
|
|
}); err != nil {
|
|
log.WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload")
|
|
return []reconcile.Request{}
|
|
}
|
|
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
|
|
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
|
|
func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
|
if dataUpload.Spec.Cancel {
|
|
return false
|
|
}
|
|
|
|
dataUpload.Spec.Cancel = true
|
|
dataUpload.Status.Message = fmt.Sprintf("Cancel dataupload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
|
|
|
|
return true
|
|
})
|
|
|
|
if err != nil {
|
|
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
|
|
return []reconcile.Request{}
|
|
}
|
|
log.Infof("Exposed pod is in abnormal status(reason %s) and dataupload is marked as cancel", reason)
|
|
} else {
|
|
return []reconcile.Request{}
|
|
}
|
|
|
|
request := reconcile.Request{
|
|
NamespacedName: types.NamespacedName{
|
|
Namespace: du.Namespace,
|
|
Name: du.Name,
|
|
},
|
|
}
|
|
return []reconcile.Request{request}
|
|
}
|
|
|
|
func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) {
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
|
|
du.Status.Node = r.nodeName
|
|
}
|
|
|
|
func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
|
|
if se, ok := r.snapshotExposerList[du.Spec.SnapshotType]; ok {
|
|
var volumeSnapshotName string
|
|
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition
|
|
volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot
|
|
}
|
|
se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace)
|
|
} else {
|
|
log.Errorf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType)
|
|
}
|
|
|
|
return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log)
|
|
}
|
|
|
|
func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) error {
|
|
log.Info("update data upload status to Failed")
|
|
|
|
if patchErr := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log, func(du *velerov2alpha1api.DataUpload) bool {
|
|
if isDataUploadInFinalState(du) {
|
|
return false
|
|
}
|
|
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
|
|
du.Status.Message = errors.WithMessage(err, msg).Error()
|
|
if du.Status.StartTimestamp.IsZero() {
|
|
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
}
|
|
|
|
if dataPathError, ok := err.(datapath.DataPathError); ok {
|
|
du.Status.SnapshotID = dataPathError.GetSnapshotID()
|
|
}
|
|
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
|
|
return true
|
|
}); patchErr != nil {
|
|
log.WithError(patchErr).Error("error updating DataUpload status")
|
|
} else {
|
|
r.metrics.RegisterDataUploadFailure(r.nodeName)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) {
|
|
r.logger.Infof("Accepting data upload %s", du.Name)
|
|
|
|
// For all data upload controller in each node-agent will try to update dataupload CR, and only one controller will success,
|
|
// and the success one could handle later logic
|
|
updated := du.DeepCopy()
|
|
|
|
updateFunc := func(dataUpload *velerov2alpha1api.DataUpload) {
|
|
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
|
|
dataUpload.Status.AcceptedByNode = r.nodeName
|
|
dataUpload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
|
}
|
|
|
|
succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, updated, updateFunc)
|
|
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if succeeded {
|
|
updateFunc(du) // If update success, it's need to update du values in memory
|
|
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
|
|
return true, nil
|
|
}
|
|
|
|
r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
|
|
return false, nil
|
|
}
|
|
|
|
func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) {
|
|
log := r.logger.WithField("Dataupload", du.Name)
|
|
|
|
log.Info("Timeout happened for preparing dataupload")
|
|
|
|
succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(du *velerov2alpha1api.DataUpload) {
|
|
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
|
|
du.Status.Message = "timeout on preparing data upload"
|
|
})
|
|
|
|
if err != nil {
|
|
log.WithError(err).Warn("Failed to update dataupload")
|
|
return
|
|
}
|
|
|
|
if !succeeded {
|
|
log.Warn("Dataupload has been updated by others")
|
|
return
|
|
}
|
|
|
|
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
|
|
if !ok {
|
|
log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)).
|
|
Warn("Failed to clean up resources on canceled")
|
|
} else {
|
|
var volumeSnapshotName string
|
|
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition
|
|
volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot
|
|
}
|
|
|
|
diags := strings.Split(ep.DiagnoseExpose(ctx, getOwnerObject(du)), "\n")
|
|
for _, diag := range diags {
|
|
log.Warnf("[Diagnose DU expose]%s", diag)
|
|
}
|
|
|
|
ep.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace)
|
|
|
|
log.Info("Dataupload has been cleaned up")
|
|
}
|
|
|
|
r.metrics.RegisterDataUploadFailure(r.nodeName)
|
|
}
|
|
|
|
var funcExclusiveUpdateDataUpload = exclusiveUpdateDataUpload
|
|
|
|
func exclusiveUpdateDataUpload(ctx context.Context, cli client.Client, du *velerov2alpha1api.DataUpload,
|
|
updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) {
|
|
updateFunc(du)
|
|
|
|
err := cli.Update(ctx, du)
|
|
if err == nil {
|
|
return true, nil
|
|
}
|
|
|
|
// warn we won't rollback du values in memory when error
|
|
if apierrors.IsConflict(err) {
|
|
return false, nil
|
|
} else {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string) {
|
|
asyncBR := r.dataPathMgr.GetAsyncBR(duName)
|
|
if asyncBR != nil {
|
|
asyncBR.Close(ctx)
|
|
}
|
|
|
|
r.dataPathMgr.RemoveAsyncBR(duName)
|
|
}
|
|
|
|
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (any, error) {
|
|
log := r.logger.WithField("dataupload", du.Name)
|
|
|
|
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
|
|
pvc := &corev1api.PersistentVolumeClaim{}
|
|
err := r.client.Get(context.Background(), types.NamespacedName{
|
|
Namespace: du.Spec.SourceNamespace,
|
|
Name: du.Spec.SourcePVC,
|
|
}, pvc)
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
|
|
}
|
|
|
|
nodeOS := kube.GetPVCAttachingNodeOS(pvc, r.kubeClient.CoreV1(), r.kubeClient.StorageV1(), log)
|
|
|
|
if err := kube.HasNodeWithOS(context.Background(), nodeOS, r.kubeClient.CoreV1()); err != nil {
|
|
return nil, errors.Wrapf(err, "no appropriate node to run data upload for PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
|
|
}
|
|
|
|
accessMode := exposer.AccessModeFileSystem
|
|
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1api.PersistentVolumeBlock {
|
|
accessMode = exposer.AccessModeBlock
|
|
}
|
|
|
|
hostingPodLabels := map[string]string{velerov1api.DataUploadLabel: du.Name}
|
|
for _, k := range util.ThirdPartyLabels {
|
|
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, du.Namespace, k, nodeOS); err != nil {
|
|
if err != nodeagent.ErrNodeAgentLabelNotFound {
|
|
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
|
|
}
|
|
} else {
|
|
hostingPodLabels[k] = v
|
|
}
|
|
}
|
|
|
|
hostingPodAnnotation := map[string]string{}
|
|
for _, k := range util.ThirdPartyAnnotations {
|
|
if v, err := nodeagent.GetAnnotationValue(context.Background(), r.kubeClient, du.Namespace, k, nodeOS); err != nil {
|
|
if err != nodeagent.ErrNodeAgentAnnotationNotFound {
|
|
log.WithError(err).Warnf("Failed to check node-agent annotation, skip adding host pod annotation %s", k)
|
|
}
|
|
} else {
|
|
hostingPodAnnotation[k] = v
|
|
}
|
|
}
|
|
|
|
return &exposer.CSISnapshotExposeParam{
|
|
SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot,
|
|
SourceNamespace: du.Spec.SourceNamespace,
|
|
StorageClass: du.Spec.CSISnapshot.StorageClass,
|
|
HostingPodLabels: hostingPodLabels,
|
|
HostingPodAnnotations: hostingPodAnnotation,
|
|
AccessMode: accessMode,
|
|
OperationTimeout: du.Spec.OperationTimeout.Duration,
|
|
ExposeTimeout: r.preparingTimeout,
|
|
VolumeSize: pvc.Spec.Resources.Requests[corev1api.ResourceStorage],
|
|
Affinity: r.loadAffinity,
|
|
BackupPVCConfig: r.backupPVCConfig,
|
|
Resources: r.podResources,
|
|
NodeOS: nodeOS,
|
|
}, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) any {
|
|
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
|
|
return &exposer.CSISnapshotExposeWaitParam{
|
|
NodeClient: r.client,
|
|
NodeName: r.nodeName,
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getOwnerObject(du *velerov2alpha1api.DataUpload) corev1api.ObjectReference {
|
|
return corev1api.ObjectReference{
|
|
Kind: du.Kind,
|
|
Namespace: du.Namespace,
|
|
Name: du.Name,
|
|
UID: du.UID,
|
|
APIVersion: du.APIVersion,
|
|
}
|
|
}
|
|
|
|
func findDataUploadByPod(client client.Client, pod corev1api.Pod) (*velerov2alpha1api.DataUpload, error) {
|
|
if label, exist := pod.Labels[velerov1api.DataUploadLabel]; exist {
|
|
du := &velerov2alpha1api.DataUpload{}
|
|
err := client.Get(context.Background(), types.NamespacedName{
|
|
Namespace: pod.Namespace,
|
|
Name: label,
|
|
}, du)
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "error to find DataUpload by pod %s/%s", pod.Namespace, pod.Name)
|
|
}
|
|
return du, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool {
|
|
return du.Status.Phase == velerov2alpha1api.DataUploadPhaseFailed ||
|
|
du.Status.Phase == velerov2alpha1api.DataUploadPhaseCanceled ||
|
|
du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted
|
|
}
|
|
|
|
func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov2alpha1api.DataUpload) bool) error {
|
|
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
|
|
du := &velerov2alpha1api.DataUpload{}
|
|
if err := client.Get(ctx, namespacedName, du); err != nil {
|
|
return false, errors.Wrap(err, "getting DataUpload")
|
|
}
|
|
|
|
if updateFunc(du) {
|
|
err := client.Update(ctx, du)
|
|
if err != nil {
|
|
if apierrors.IsConflict(err) {
|
|
log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name)
|
|
return false, nil
|
|
} else {
|
|
return false, errors.Wrapf(err, "error updating dataupload with error %s/%s", du.Namespace, du.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
}
|
|
|
|
var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath
|
|
|
|
func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logger *logrus.Entry, ns string) error {
|
|
dataUploads := &velerov2alpha1api.DataUploadList{}
|
|
if err := r.client.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
|
|
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
|
|
return errors.Wrapf(err, "error to list datauploads")
|
|
}
|
|
|
|
for i := range dataUploads.Items {
|
|
du := &dataUploads.Items[i]
|
|
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
|
if du.Status.Node != r.nodeName {
|
|
logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node)
|
|
continue
|
|
}
|
|
|
|
err := funcResumeCancellableDataBackup(r, ctx, du, logger)
|
|
if err == nil {
|
|
logger.WithField("du", du.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DU")
|
|
continue
|
|
}
|
|
|
|
logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it")
|
|
|
|
resumeErr := err
|
|
err = UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
|
|
func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
|
if dataUpload.Spec.Cancel {
|
|
return false
|
|
}
|
|
|
|
dataUpload.Spec.Cancel = true
|
|
dataUpload.Status.Message = fmt.Sprintf("Resume InProgress dataupload failed with error %v, mark it as cancel", resumeErr)
|
|
|
|
return true
|
|
})
|
|
if err != nil {
|
|
logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
|
|
}
|
|
} else {
|
|
// the Prepared CR could be still handled by dataupload controller after node-agent restart
|
|
// the accepted CR may also suvived from node-agent restart as long as the intermediate objects are all done
|
|
logger.WithField("dataupload", du.GetName()).Infof("find a dataupload with status %s", du.Status.Phase)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataUploadReconciler) resumeCancellableDataPath(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) error {
|
|
log.Info("Resume cancelable dataUpload")
|
|
|
|
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
|
|
if !ok {
|
|
return errors.Errorf("error to find exposer for du %s", du.Name)
|
|
}
|
|
|
|
waitExposePara := r.setupWaitExposePara(du)
|
|
res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "error to get exposed snapshot for du %s", du.Name)
|
|
}
|
|
|
|
if res == nil {
|
|
return errors.Errorf("expose info missed for du %s", du.Name)
|
|
}
|
|
|
|
callbacks := datapath.Callbacks{
|
|
OnCompleted: r.OnDataUploadCompleted,
|
|
OnFailed: r.OnDataUploadFailed,
|
|
OnCancelled: r.OnDataUploadCancelled,
|
|
OnProgress: r.OnDataUploadProgress,
|
|
}
|
|
|
|
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, true, log)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "error to create asyncBR watcher for du %s", du.Name)
|
|
}
|
|
|
|
resumeComplete := false
|
|
defer func() {
|
|
if !resumeComplete {
|
|
r.closeDataPath(ctx, du.Name)
|
|
}
|
|
}()
|
|
|
|
if err := asyncBR.Init(ctx, nil); err != nil {
|
|
return errors.Wrapf(err, "error to init asyncBR watcher for du %s", du.Name)
|
|
}
|
|
|
|
if err := asyncBR.StartBackup(datapath.AccessPoint{
|
|
ByPath: res.ByPod.VolumeName,
|
|
}, du.Spec.DataMoverConfig, nil); err != nil {
|
|
return errors.Wrapf(err, "error to resume asyncBR watcher for du %s", du.Name)
|
|
}
|
|
|
|
resumeComplete = true
|
|
|
|
log.Infof("asyncBR is resumed for du %s", du.Name)
|
|
|
|
return nil
|
|
}
|