vgdp ms pvr data path

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/9005/head
Lyndon-Li 2025-06-06 12:42:10 +08:00
parent 3f830a7b19
commit d795f41a47
7 changed files with 546 additions and 61 deletions

View File

@ -0,0 +1 @@
Fix issue #8988, add data path for VGDP ms PVR

View File

@ -77,6 +77,11 @@ spec:
BackupStorageLocation is the name of the backup storage location
where the backup repository is stored.
type: string
cancel:
description: |-
Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
when the PodVolumeRestore is in InProgress phase
type: boolean
pod:
description: Pod is a reference to the pod containing the volume to
be restored.

File diff suppressed because one or more lines are too long

View File

@ -31,6 +31,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.AddCommand(
NewBackupCommand(f),
NewRestoreCommand(f),
)
return command

View File

@ -33,7 +33,6 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
@ -42,24 +41,24 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type pvrConfig struct {
type podVolumeRestoreConfig struct {
volumePath string
pvrName string
resourceTimeout time.Duration
winHPC bool
}
func NewRestoreCommand(f client.Factory) *cobra.Command {
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
formatFlag := logging.NewFormatFlag()
config := pvrConfig{}
config := podVolumeRestoreConfig{}
command := &cobra.Command{
Use: "restore",
@ -76,7 +75,7 @@ func NewRestoreCommand(f client.Factory) *cobra.Command {
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s, err := newPodVolumeRestore(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create pod volume restore, %v", err)
kube.ExitPodWithMessage(logger, false, "Failed to create pod volume restore, %v", err)
}
s.run()
@ -104,12 +103,12 @@ type podVolumeRestore struct {
cache ctlcache.Cache
namespace string
nodeName string
config pvrConfig
config podVolumeRestoreConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}
func newPodVolumeRestore(logger logrus.FieldLogger, factory client.Factory, config pvrConfig) (*podVolumeRestore, error) {
func newPodVolumeRestore(logger logrus.FieldLogger, factory client.Factory, config podVolumeRestoreConfig) (*podVolumeRestore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
clientConfig, err := factory.ClientConfig()
@ -233,6 +232,8 @@ func (s *podVolumeRestore) runDataPath() {
return
}
s.logger.Infof("Running data path service %s", s.config.pvrName)
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
@ -270,7 +271,7 @@ func (s *podVolumeRestore) createDataPathService() (dataPathService, error) {
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
pvrInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
pvrInformer, err := s.cache.GetInformer(s.ctx, &velerov1api.PodVolumeRestore{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}

View File

@ -37,7 +37,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
cachetool "k8s.io/client-go/tools/cache"
@ -61,14 +60,14 @@ type RestoreMicroService struct {
resultSignal chan dataPathResult
ddInformer cache.Informer
pvrHandler cachetool.ResourceEventHandlerRegistration
nodeName string
pvrInformer cache.Informer
pvrHandler cachetool.ResourceEventHandlerRegistration
nodeName string
}
func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvrName string, namespace string, nodeName string,
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
ddInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
pvrInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
return &RestoreMicroService{
ctx: ctx,
client: client,
@ -82,14 +81,14 @@ func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClien
sourceTargetPath: sourceTargetPath,
nodeName: nodeName,
resultSignal: make(chan dataPathResult),
ddInformer: ddInformer,
pvrInformer: pvrInformer,
}
}
func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvrName, r.nodeName, r.logger)
handler, err := r.ddInformer.AddEventHandler(
handler, err := r.pvrInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj any, newObj any) {
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
@ -165,7 +164,8 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
return "", errors.Wrap(err, "error to create data path")
}
log.Debug("Found volume path")
log.Debug("Async fs br created")
if err := fsRestore.Init(ctx,
&datapath.FSBRInitParam{
BSLName: pvr.Spec.BackupStorageLocation,
@ -178,7 +178,8 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
}); err != nil {
return "", errors.Wrap(err, "error to initialize data path")
}
log.Info("fs init")
log.Info("Async fs br init")
if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, r.sourceTargetPath, pvr.Spec.UploaderSettings); err != nil {
return "", errors.Wrap(err, "error starting data path restore")
@ -212,56 +213,26 @@ func (r *RestoreMicroService) Shutdown() {
r.closeDataPath(r.ctx, r.pvrName)
if r.pvrHandler != nil {
if err := r.ddInformer.RemoveEventHandler(r.pvrHandler); err != nil {
if err := r.pvrInformer.RemoveEventHandler(r.pvrHandler); err != nil {
r.logger.WithError(err).Warn("Failed to remove pod handler")
}
}
}
var funcWriteCompletionMark = writeCompletionMark
func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
log := r.logger.WithField("PVR", pvrName)
volumePath := result.Restore.Target.ByPath
if volumePath == "" {
r.recordPvrFailed(pvrName, "invalid restore target", errors.New("path is empty"))
return
}
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
// for the one specific to the restore being executed).
if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
log.WithError(err).Warnf("error removing .velero directory from directory %s", volumePath)
}
var restoreUID types.UID
for _, owner := range r.pvr.OwnerReferences {
if boolptr.IsSetToTrue(owner.Controller) {
restoreUID = owner.UID
break
}
}
// Create the .velero directory within the volume dir so we can write a done file
// for this restore.
if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
r.recordPvrFailed(pvrName, "error creating .velero directory for done file", err)
return
}
// Write a done file with name=<restore-uid> into the just-created .velero dir
// within the volume. The velero init container on the pod is waiting
// for this file to exist in each restored volume before completing.
if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check.
r.recordPvrFailed(pvrName, "error writing done file", err)
return
err := funcWriteCompletionMark(r.pvr, result.Restore, log)
if err != nil {
log.WithError(err).Warnf("Failed to write completion mark, restored pod may failed to start")
}
restoreBytes, err := funcMarshal(result.Restore)
if err != nil {
log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore)
r.recordPvrFailed(pvrName, fmt.Sprintf("error marshaling restore result %v", result.Restore), err)
r.recordPvrFailed(fmt.Sprintf("error marshaling restore result %v", result.Restore), err)
} else {
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCompleted, string(restoreBytes))
r.resultSignal <- dataPathResult{
@ -272,11 +243,11 @@ func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace stri
log.Info("Async fs restore data path completed")
}
func (r *RestoreMicroService) recordPvrFailed(pvrName string, msg string, err error) {
func (r *RestoreMicroService) recordPvrFailed(msg string, err error) {
evtMsg := fmt.Sprintf("%s, error %v", msg, err)
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonFailed, evtMsg)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, msg, pvrName),
err: errors.Wrapf(err, msg),
}
}
@ -284,7 +255,7 @@ func (r *RestoreMicroService) OnPvrFailed(ctx context.Context, namespace string,
log := r.logger.WithField("PVR", pvrName)
log.WithError(err).Error("Async fs restore data path failed")
r.recordPvrFailed(pvrName, fmt.Sprintf("Data path for PVR %s failed", pvrName), err)
r.recordPvrFailed(fmt.Sprintf("Data path for PVR %s failed", pvrName), err)
}
func (r *RestoreMicroService) OnPvrCancelled(ctx context.Context, namespace string, pvrName string) {
@ -311,13 +282,13 @@ func (r *RestoreMicroService) OnPvrProgress(ctx context.Context, namespace strin
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonProgress, string(progressBytes))
}
func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) {
fsRestore := r.dataPathMgr.GetAsyncBR(ddName)
func (r *RestoreMicroService) closeDataPath(ctx context.Context, pvrName string) {
fsRestore := r.dataPathMgr.GetAsyncBR(pvrName)
if fsRestore != nil {
fsRestore.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(ddName)
r.dataPathMgr.RemoveAsyncBR(pvrName)
}
func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeRestore) {
@ -332,3 +303,39 @@ func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeR
fsBackup.Cancel()
}
}
func writeCompletionMark(pvr *velerov1api.PodVolumeRestore, result datapath.RestoreResult, log logrus.FieldLogger) error {
volumePath := result.Target.ByPath
if volumePath == "" {
return errors.New("target volume is empty in restore result")
}
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
// for the one specific to the restore being executed).
if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
log.WithError(err).Warnf("Failed to remove .velero directory from directory %s", volumePath)
}
if len(pvr.OwnerReferences) == 0 {
return errors.New("error finding restore UID")
}
restoreUID := pvr.OwnerReferences[0].UID
// Create the .velero directory within the volume dir so we can write a done file
// for this restore.
if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
return errors.Wrapf(err, "error creating .velero directory for done file")
}
// Write a done file with name=<restore-uid> into the just-created .velero dir
// within the volume. The velero init container on the pod is waiting
// for this file to exist in each restored volume before completing.
if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check.
return errors.Wrapf(err, "error writing done file")
}
return nil
}

View File

@ -0,0 +1,470 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/uploader"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
)
type restoreMsTestHelper struct {
eventReason string
eventMsg string
marshalErr error
marshalBytes []byte
withEvent bool
eventLock sync.Mutex
writeCompletionErr error
}
func (rt *restoreMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
rt.withEvent = true
rt.eventReason = reason
rt.eventMsg = fmt.Sprintf(message, a...)
}
func (rt *restoreMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
rt.withEvent = true
rt.eventReason = reason
rt.eventMsg = fmt.Sprintf(message, a...)
}
func (rt *restoreMsTestHelper) Shutdown() {}
func (rt *restoreMsTestHelper) Marshal(v any) ([]byte, error) {
if rt.marshalErr != nil {
return nil, rt.marshalErr
}
return rt.marshalBytes, nil
}
func (rt *restoreMsTestHelper) EventReason() string {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
return rt.eventReason
}
func (rt *restoreMsTestHelper) EventMessage() string {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
return rt.eventMsg
}
func (rt *restoreMsTestHelper) WriteCompletionMark(*velerov1api.PodVolumeRestore, datapath.RestoreResult, logrus.FieldLogger) error {
return rt.writeCompletionErr
}
func TestOnPvrFailed(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{}
rs := &RestoreMicroService{
pvrName: pvrName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := "Data path for PVR fake-pvr failed: fake-error"
expectedEventReason := datapath.EventReasonFailed
expectedEventMsg := "Data path for PVR fake-pvr failed, error fake-error"
go rs.OnPvrFailed(context.TODO(), velerov1api.DefaultNamespace, pvrName, errors.New("fake-error"))
result := <-rs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, rt.EventReason())
assert.Equal(t, expectedEventMsg, rt.EventMessage())
}
func TestPvrCancelled(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{}
rs := RestoreMicroService{
pvrName: pvrName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := datapath.ErrCancelled
expectedEventReason := datapath.EventReasonCancelled
expectedEventMsg := "Data path for PVR fake-pvr canceled"
go rs.OnPvrCancelled(context.TODO(), velerov1api.DefaultNamespace, pvrName)
result := <-rs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, rt.EventReason())
assert.Equal(t, expectedEventMsg, rt.EventMessage())
}
func TestOnPvrCompleted(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
writeCompletionErr error
expectedLog string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "error marshaling restore result {{ } 0}: fake-marshal-error",
},
{
name: "succeed",
marshallStr: "fake-complete-string",
expectedEventReason: datapath.EventReasonCompleted,
expectedEventMsg: "fake-complete-string",
},
{
name: "succeed but write completion mark fail",
marshallStr: "fake-complete-string",
writeCompletionErr: errors.New("fake-write-completion-error"),
expectedEventReason: datapath.EventReasonCompleted,
expectedEventMsg: "fake-complete-string",
expectedLog: "Failed to write completion mark, restored pod may failed to start",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
writeCompletionErr: test.writeCompletionErr,
}
logBuffer := []string{}
rs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewMultipleLogger(&logBuffer),
}
funcMarshal = rt.Marshal
funcWriteCompletionMark = rt.WriteCompletionMark
go rs.OnPvrCompleted(context.TODO(), velerov1api.DefaultNamespace, pvrName, datapath.Result{})
result := <-rs.resultSignal
if test.marshalErr != nil {
assert.EqualError(t, result.err, test.expectedErr)
} else {
assert.NoError(t, result.err)
assert.Equal(t, test.expectedEventReason, rt.EventReason())
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
if test.expectedLog != "" {
assert.Contains(t, logBuffer[0], test.expectedLog)
}
}
})
}
}
func TestOnPvrProgress(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal restore result",
},
{
name: "succeed",
marshallStr: "fake-progress-string",
expectedEventReason: datapath.EventReasonProgress,
expectedEventMsg: "fake-progress-string",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
}
rs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
logger: velerotest.NewLogger(),
}
funcMarshal = rt.Marshal
rs.OnPvrProgress(context.TODO(), velerov1api.DefaultNamespace, pvrName, &uploader.Progress{})
if test.marshalErr != nil {
assert.False(t, rt.withEvent)
} else {
assert.True(t, rt.withEvent)
assert.Equal(t, test.expectedEventReason, rt.EventReason())
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
}
})
}
}
func TestCancelPodVolumeRestore(t *testing.T) {
tests := []struct {
name string
expectedEventReason string
expectedEventMsg string
expectedErr string
}{
{
name: "no fs restore",
expectedEventReason: datapath.EventReasonCancelled,
expectedEventMsg: "Data path for PVR fake-pvr canceled",
expectedErr: datapath.ErrCancelled,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pvrName := "fake-pvr"
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result()
rt := &restoreMsTestHelper{}
rs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
go rs.cancelPodVolumeRestore(pvr)
result := <-rs.resultSignal
assert.EqualError(t, result.err, test.expectedErr)
assert.True(t, rt.withEvent)
assert.Equal(t, test.expectedEventReason, rt.EventReason())
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
})
}
}
func TestRunCancelableDataPathRestore(t *testing.T) {
pvrName := "fake-pvr"
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseNew).Result()
pvrInProgress := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result()
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
tests := []struct {
name string
ctx context.Context
result *dataPathResult
dataPathMgr *datapath.Manager
kubeClientObj []runtime.Object
initErr error
startErr error
dataPathStarted bool
expectedEventMsg string
expectedErr string
}{
{
name: "no pvr",
ctx: ctxTimeout,
expectedErr: "error waiting for PVR: context deadline exceeded",
},
{
name: "pvr not in in-progress",
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{pvr},
expectedErr: "error waiting for PVR: context deadline exceeded",
},
{
name: "create data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathMgr: datapath.NewManager(0),
expectedErr: "error to create data path: Concurrent number exceeds",
},
{
name: "init data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
initErr: errors.New("fake-init-error"),
expectedErr: "error to initialize data path: fake-init-error",
},
{
name: "start data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
startErr: errors.New("fake-start-error"),
expectedErr: "error starting data path restore: fake-start-error",
},
{
name: "data path timeout",
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
expectedErr: "timed out waiting for fs restore to complete",
},
{
name: "data path returns error",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathStarted: true,
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
expectedErr: "fake-data-path-error",
},
{
name: "succeed",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathStarted: true,
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
},
}
scheme := runtime.NewScheme()
velerov1api.AddToScheme(scheme)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
rt := &restoreMsTestHelper{}
rs := &RestoreMicroService{
namespace: velerov1api.DefaultNamespace,
pvrName: pvrName,
ctx: context.Background(),
client: fakeClient,
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
if test.ctx != nil {
rs.ctx = test.ctx
}
if test.dataPathMgr != nil {
rs.dataPathMgr = test.dataPathMgr
}
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
fsBR := datapathmockes.NewAsyncBR(t)
if test.initErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
}
if test.startErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
}
if test.dataPathStarted {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(nil)
}
return fsBR
}
if test.result != nil {
go func() {
time.Sleep(time.Millisecond * 500)
rs.resultSignal <- *test.result
}()
}
result, err := rs.RunCancelableDataPath(test.ctx)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.result.result, result)
}
if test.expectedEventMsg != "" {
assert.True(t, rt.withEvent)
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
}
})
}
cancel()
}