Add data download controller

Signed-off-by: Ming <mqiu@vmware.com>
pull/6436/head
Ming 2023-06-26 15:53:21 +00:00
parent ee22125f9c
commit 1bfcee776c
7 changed files with 988 additions and 0 deletions

View File

@ -0,0 +1 @@
Add data download controller for data mover

View File

@ -0,0 +1,103 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
)
// DataDownloadBuilder builds DataDownload objects
type DataDownloadBuilder struct {
object *velerov2alpha1api.DataDownload
}
// ForDataDownload is the constructor for a DataDownloadBuilder.
func ForDataDownload(ns, name string) *DataDownloadBuilder {
return &DataDownloadBuilder{
object: &velerov2alpha1api.DataDownload{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov2alpha1api.SchemeGroupVersion.String(),
Kind: "DataDownloadload",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
},
}
}
// Result returns the built DataDownload.
func (d *DataDownloadBuilder) Result() *velerov2alpha1api.DataDownload {
return d.object
}
// BackupStorageLocation sets the DataDownload's backup storage location.
func (d *DataDownloadBuilder) BackupStorageLocation(name string) *DataDownloadBuilder {
d.object.Spec.BackupStorageLocation = name
return d
}
// Phase sets the DataDownload's phase.
func (d *DataDownloadBuilder) Phase(phase velerov2alpha1api.DataDownloadPhase) *DataDownloadBuilder {
d.object.Status.Phase = phase
return d
}
// SnapshotID sets the DataDownload's SnapshotID.
func (d *DataDownloadBuilder) SnapshotID(id string) *DataDownloadBuilder {
d.object.Spec.SnapshotID = id
return d
}
// DataMover sets the DataDownload's DataMover.
func (d *DataDownloadBuilder) DataMover(dataMover string) *DataDownloadBuilder {
d.object.Spec.DataMover = dataMover
return d
}
// SourceNamespace sets the DataDownload's SourceNamespace.
func (d *DataDownloadBuilder) SourceNamespace(sourceNamespace string) *DataDownloadBuilder {
d.object.Spec.SourceNamespace = sourceNamespace
return d
}
// TargetVolume sets the DataDownload's TargetVolume.
func (d *DataDownloadBuilder) TargetVolume(targetVolume velerov2alpha1api.TargetVolumeSpec) *DataDownloadBuilder {
d.object.Spec.TargetVolume = targetVolume
return d
}
// Cancel sets the DataDownload's Cancel.
func (d *DataDownloadBuilder) Cancel(cancel bool) *DataDownloadBuilder {
d.object.Spec.Cancel = cancel
return d
}
// OperationTimeout sets the DataDownload's OperationTimeout.
func (d *DataDownloadBuilder) OperationTimeout(timeout metav1.Duration) *DataDownloadBuilder {
d.object.Spec.OperationTimeout = timeout
return d
}
// DataMoverConfig sets the DataDownload's DataMoverConfig.
func (d *DataDownloadBuilder) DataMoverConfig(config *map[string]string) *DataDownloadBuilder {
d.object.Spec.DataMoverConfig = *config
return d
}

View File

@ -260,6 +260,10 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}
if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
s.logger.Info("Controllers starting...")
if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil {

View File

@ -0,0 +1,487 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
datamover "github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
repository "github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
// DataDownloadReconciler reconciles a DataDownload object
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
repositoryEnsurer *repository.Ensurer
dataPathMgr *datapath.Manager
}
func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, logger logrus.FieldLogger) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
nodeName: nodeName,
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: datapath.NewManager(1),
}
}
// +kubebuilder:rbac:groups=velero.io,resources=datadownloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=datadownloads/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get
// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get
func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.logger.WithFields(logrus.Fields{
"controller": "datadownload",
"datadownload": req.NamespacedName,
})
log.Infof("Reconcile %s", req.Name)
dd := &velerov2alpha1api.DataDownload{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, dd); err != nil {
if apierrors.IsNotFound(err) {
log.Warn("DataDownload not found, skip")
return ctrl.Result{}, nil
}
log.WithError(err).Error("Unable to get the DataDownload")
return ctrl.Result{}, err
}
if dd.Spec.DataMover != "" && dd.Spec.DataMover != dataMoverType {
log.WithField("data mover", dd.Spec.DataMover).Info("it is not one built-in data mover which is not supported by Velero")
return ctrl.Result{}, nil
}
if r.restoreExposer == nil {
return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log)
}
if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew {
log.Info("Data download starting")
if _, err := r.getTargetPVC(ctx, dd); err != nil {
return ctrl.Result{Requeue: true}, nil
}
accepted, err := r.acceptDataDownload(ctx, dd)
if err != nil {
return r.errorOut(ctx, dd, err, "error to accept the data download", log)
}
if !accepted {
log.Debug("Data download is not accepted")
return ctrl.Result{}, nil
}
log.Info("Data download is accepted")
hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
if err != nil {
return r.errorOut(ctx, dd, err, "error to start restore expose", log)
}
log.Info("Restore is exposed")
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsRestore != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
result, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration)
if err != nil {
return r.errorOut(ctx, dd, err, "restore exposer is not ready", log)
} else if result == nil {
log.Debug("Get empty restore exposer")
return ctrl.Result{}, nil
}
log.Info("Restore PVC is ready")
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
}
log.Info("Data download is marked as in progress")
return r.runCancelableDataPath(ctx, dd, result, log)
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel {
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsRestore == nil {
return ctrl.Result{}, nil
}
log.Info("Data download is being canceled")
// Update status to Canceling.
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
return ctrl.Result{}, err
}
fsRestore.Cancel()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
} else {
log.Debugf("Data download now is in %s phase and do nothing by current %s controller", dd.Status.Phase, r.nodeName)
return ctrl.Result{}, nil
}
}
func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Creating data path routine")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
OnCancelled: r.OnDataDownloadCancelled,
OnProgress: r.OnDataDownloadProgress,
}
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("runCancelableDataDownload is concurrent limited")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log)
}
log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsRestore.Init(ctx, dd.Spec.BackupStorageLocation, dd.Spec.SourceNamespace, datamover.GetUploaderType(dd.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repositoryEnsurer, r.credentialGetter); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize data path", log)
}
log.WithField("path", path.ByPath).Info("fs init")
if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting data path %s restore", path.ByPath), log)
}
log.WithField("path", path.ByPath).Info("Async fs restore data path started")
return ctrl.Result{}, nil
}
func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.Info("Async fs restore data path completed")
var dd velerov2alpha1api.DataDownload
if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil {
log.WithError(err).Warn("Failed to get datadownload on completion")
return
}
objRef := getDataDownloadOwnerObject(&dd)
err := r.restoreExposer.RebindVolume(ctx, objRef, dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, dd.Spec.OperationTimeout.Duration)
if err != nil {
log.WithError(err).Error("Failed to rebind PV to target PVC on completion")
return
}
log.Info("Cleaning up exposed environment")
r.restoreExposer.CleanUp(ctx, objRef)
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
} else {
log.Infof("Data download is marked as %s", dd.Status.Phase)
}
}
func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.WithError(err).Error("Async fs restore data path failed")
var dd velerov2alpha1api.DataDownload
if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil {
log.WithError(getErr).Warn("Failed to get data download on failure")
} else {
if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil {
log.WithError(err).Warnf("Failed to patch data download with err %v", errOut)
}
}
}
func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.Warn("Async fs backup data path canceled")
var dd velerov2alpha1api.DataDownload
if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil {
log.WithError(getErr).Warn("Failed to get datadownload on cancel")
} else {
// cleans up any objects generated during the snapshot expose
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd))
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
}
}
}
func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
log := r.logger.WithField("datadownload", ddName)
var dd velerov2alpha1api.DataDownload
if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil {
log.WithError(err).Warn("Failed to get data download on progress")
return
}
original := dd.DeepCopy()
dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Failed to update restore snapshot progress")
}
}
// SetupWithManager registers the DataDownload controller.
// The fresh new DataDownload CR first created will trigger to create one pod (long time, maybe failure or unknown status) by one of the datadownload controllers
// then the request will get out of the Reconcile queue immediately by not blocking others' CR handling, in order to finish the rest data download process we need to
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&velerov2alpha1api.DataDownload{}).
Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
newObj := ue.ObjectNew.(*v1.Pod)
if _, ok := newObj.Labels[velerov1api.DataDownloadLabel]; !ok {
return false
}
if newObj.Status.Phase != v1.PodRunning {
return false
}
if newObj.Spec.NodeName == "" {
return false
}
return true
},
CreateFunc: func(event.CreateEvent) bool {
return false
},
DeleteFunc: func(de event.DeleteEvent) bool {
return false
},
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
})).
Complete(r)
}
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)
dd := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataDownloadLabel],
}, dd)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
}
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
return []reconcile.Request{}
}
requests := make([]reconcile.Request, 1)
r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
err = r.patchDataDownload(context.Background(), dd, prepareDataDownload)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to patch data download")
return []reconcile.Request{}
}
requests[0] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: dd.Namespace,
Name: dd.Name,
},
}
return requests
}
func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error {
original := req.DeepCopy()
mutate(req)
if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil {
return errors.Wrap(err, "error patching data download")
}
return nil
}
func prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
}
func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
return ctrl.Result{}, r.updateStatusToFailed(ctx, dd, err, msg, log)
}
func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error {
log.Infof("update data download status to %v", dd.Status.Phase)
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
if err = r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataDownload status")
return err
}
return nil
}
func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) {
updated := dd.DeepCopy()
updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
r.logger.Infof("Accepting snapshot restore %s", dd.Name)
// For all data download controller in each node-agent will try to update download CR, and only one controller will success,
// and the success one could handle later logic
err := r.client.Update(ctx, updated)
if err == nil {
return true, nil
} else if apierrors.IsConflict(err) {
r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others")
return false, nil
} else {
return false, err
}
}
func (r *DataDownloadReconciler) getTargetPVC(ctx context.Context, dd *velerov2alpha1api.DataDownload) (*v1.PersistentVolumeClaim, error) {
return r.kubeClient.CoreV1().PersistentVolumeClaims(dd.Spec.TargetVolume.Namespace).Get(ctx, dd.Spec.TargetVolume.PVC, metav1.GetOptions{})
}
func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(ddName)
if fsBackup != nil {
fsBackup.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(ddName)
}
func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectReference {
return v1.ObjectReference{
Kind: dd.Kind,
Namespace: dd.Namespace,
Name: dd.Name,
UID: dd.UID,
APIVersion: dd.APIVersion,
}
}

View File

@ -0,0 +1,211 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"testing"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgofake "k8s.io/client-go/kubernetes/fake"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks"
)
const dataDownloadName string = "datadownload-1"
func dataDownloadBuilder() *builder.DataDownloadBuilder {
return builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).
BackupStorageLocation("bsl-loc").
DataMover("velero").
SnapshotID("test-snapshot-id").TargetVolume(velerov2alpha1api.TargetVolumeSpec{
PV: "test-pv",
PVC: "test-pvc",
Namespace: "test-ns",
})
}
func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
if err != nil {
return nil, err
}
err = velerov2alpha1api.AddToScheme(scheme)
if err != nil {
return nil, err
}
err = corev1.AddToScheme(scheme)
if err != nil {
return nil, err
}
fakeClient := &FakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).Build(),
}
if len(needError) == 4 {
fakeClient.getError = needError[0]
fakeClient.createError = needError[1]
fakeClient.updateError = needError[2]
fakeClient.patchError = needError[3]
}
fakeKubeClient := clientgofake.NewSimpleClientset(objects...)
fakeFS := velerotest.NewFakeFileSystem()
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataDownloadName)
_, err = fakeFS.Create(pathGlob)
if err != nil {
return nil, err
}
credentialFileStore, err := credentials.NewNamespacedFileStore(
fakeClient,
velerov1api.DefaultNamespace,
"/tmp/credentials",
fakeFS,
)
if err != nil {
return nil, err
}
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", velerotest.NewLogger()), nil
}
func TestDataDownloadReconcile(t *testing.T) {
tests := []struct {
name string
dd *velerov2alpha1api.DataDownload
targetPVC *corev1.PersistentVolumeClaim
dataMgr *datapath.Manager
needErrs []bool
isExposeErr bool
isGetExposeErr bool
expectedStatusMsg string
}{
{
name: "Restore is exposed",
dd: dataDownloadBuilder().Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
},
{
name: "Get empty restore exposer",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
},
{
name: "Failed to get restore exposer",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectedStatusMsg: "Error to get restore exposer",
isGetExposeErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r, err := initDataDownloadReconciler([]runtime.Object{test.targetPVC}, test.needErrs...)
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{})
if test.targetPVC != nil {
r.client.Delete(ctx, test.targetPVC, &kbclient.DeleteOptions{})
}
}()
ctx := context.Background()
if test.dd.Namespace == velerov1api.DefaultNamespace {
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)
}
if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
r.dataPathMgr = datapath.NewManager(1)
}
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
return datapathmockes.NewAsyncBR(t)
}
if test.isExposeErr || test.isGetExposeErr {
r.restoreExposer = func() exposer.GenericRestoreExposer {
ep := exposermockes.NewGenericRestoreExposer(t)
if test.isExposeErr {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
}
if test.isGetExposeErr {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer"))
}
ep.On("CleanUp", mock.Anything, mock.Anything).Return()
return ep
}()
}
if test.dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
if fsBR := r.dataPathMgr.GetAsyncBR(test.dd.Name); fsBR == nil {
_, err := r.dataPathMgr.CreateFileSystemBR(test.dd.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, velerotest.NewLogger())
require.NoError(t, err)
}
}
actualResult, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: velerov1api.DefaultNamespace,
Name: test.dd.Name,
},
})
require.Nil(t, err)
require.NotNil(t, actualResult)
dd := velerov2alpha1api.DataDownload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.dd.Name,
Namespace: test.dd.Namespace,
}, &dd)
if test.isGetExposeErr {
assert.Contains(t, dd.Status.Message, test.expectedStatusMsg)
}
require.Nil(t, err)
t.Logf("%s: \n %v \n", test.name, dd)
})
}
}

View File

@ -0,0 +1,86 @@
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
context "context"
credentials "github.com/vmware-tanzu/velero/internal/credentials"
datapath "github.com/vmware-tanzu/velero/pkg/datapath"
mock "github.com/stretchr/testify/mock"
repository "github.com/vmware-tanzu/velero/pkg/repository"
)
// AsyncBR is an autogenerated mock type for the AsyncBR type
type AsyncBR struct {
mock.Mock
}
// Cancel provides a mock function with given fields:
func (_m *AsyncBR) Cancel() {
_m.Called()
}
// Close provides a mock function with given fields: ctx
func (_m *AsyncBR) Close(ctx context.Context) {
_m.Called(ctx)
}
// Init provides a mock function with given fields: ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter
func (_m *AsyncBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
ret := _m.Called(ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, string, *repository.Ensurer, *credentials.CredentialGetter) error); ok {
r0 = rf(ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter)
} else {
r0 = ret.Error(0)
}
return r0
}
// StartBackup provides a mock function with given fields: source, realSource, parentSnapshot, forceFull, tags
func (_m *AsyncBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
ret := _m.Called(source, realSource, parentSnapshot, forceFull, tags)
var r0 error
if rf, ok := ret.Get(0).(func(datapath.AccessPoint, string, string, bool, map[string]string) error); ok {
r0 = rf(source, realSource, parentSnapshot, forceFull, tags)
} else {
r0 = ret.Error(0)
}
return r0
}
// StartRestore provides a mock function with given fields: snapshotID, target
func (_m *AsyncBR) StartRestore(snapshotID string, target datapath.AccessPoint) error {
ret := _m.Called(snapshotID, target)
var r0 error
if rf, ok := ret.Get(0).(func(string, datapath.AccessPoint) error); ok {
r0 = rf(snapshotID, target)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewAsyncBR interface {
mock.TestingT
Cleanup(func())
}
// NewAsyncBR creates a new instance of AsyncBR. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAsyncBR(t mockConstructorTestingTNewAsyncBR) *AsyncBR {
mock := &AsyncBR{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,96 @@
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
context "context"
client "sigs.k8s.io/controller-runtime/pkg/client"
exposer "github.com/vmware-tanzu/velero/pkg/exposer"
mock "github.com/stretchr/testify/mock"
time "time"
v1 "k8s.io/api/core/v1"
)
// GenericRestoreExposer is an autogenerated mock type for the GenericRestoreExposer type
type GenericRestoreExposer struct {
mock.Mock
}
// CleanUp provides a mock function with given fields: _a0, _a1
func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectReference) {
_m.Called(_a0, _a1)
}
// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetExposed provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *GenericRestoreExposer) GetExposed(_a0 context.Context, _a1 v1.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
var r0 *exposer.ExposeResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok {
return rf(_a0, _a1, _a2, _a3, _a4)
}
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) *exposer.ExposeResult); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*exposer.ExposeResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) error); ok {
r1 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RebindVolume provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *GenericRestoreExposer) RebindVolume(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 time.Duration) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, time.Duration) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewGenericRestoreExposer interface {
mock.TestingT
Cleanup(func())
}
// NewGenericRestoreExposer creates a new instance of GenericRestoreExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewGenericRestoreExposer(t mockConstructorTestingTNewGenericRestoreExposer) *GenericRestoreExposer {
mock := &GenericRestoreExposer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}